diff --git a/lisa/microsoft/testsuites/xfstests/xfstesting.py b/lisa/microsoft/testsuites/xfstests/xfstesting.py index 810419dc2d..4755b45e74 100644 --- a/lisa/microsoft/testsuites/xfstests/xfstesting.py +++ b/lisa/microsoft/testsuites/xfstests/xfstesting.py @@ -1355,9 +1355,6 @@ def verify_azure_file_share( run_id_prefix="cifs_worker", ) - # Send deferred notifications now that parallel execution is complete - runner.send_deferred_notifications(worker_results, result) - # Aggregate results (raises on failure) _, _, ctx.test_failed = runner.aggregate_results(worker_results) @@ -1478,9 +1475,6 @@ def verify_azure_file_share_nfsv4( run_id_prefix="nfs_worker", ) - # Send deferred notifications now that parallel execution is complete - runner.send_deferred_notifications(worker_results, result) - # Aggregate results (raises on failure) _, _, ctx.test_failed = runner.aggregate_results(worker_results) diff --git a/lisa/microsoft/testsuites/xfstests/xfstests.py b/lisa/microsoft/testsuites/xfstests/xfstests.py index e102cb51c1..8625d7a4db 100644 --- a/lisa/microsoft/testsuites/xfstests/xfstests.py +++ b/lisa/microsoft/testsuites/xfstests/xfstests.py @@ -77,15 +77,12 @@ test_section="cifs", timeout=3600, ) - runner.send_deferred_notifications(results, result) # Send notifications runner.aggregate_results(results) # Raises if any failures finally: runner.cleanup_workers() """ -import random import re -import time from dataclasses import dataclass, field from functools import partial from pathlib import Path, PurePath, PurePosixPath @@ -106,13 +103,8 @@ Ubuntu, ) from lisa.testsuite import TestResult -from lisa.tools import Cat, Chmod, Diff, Echo, Git, Make, Rm, Sed -from lisa.util import ( - LisaException, - UnsupportedDistroException, - find_patterns_in_lines, - generate_random_chars, -) +from lisa.tools import Cat, Chmod, Diff, Echo, Git, Make, Pgrep, Rm, Sed +from lisa.util import LisaException, UnsupportedDistroException, find_patterns_in_lines from lisa.util.parallel import run_in_parallel # ============================================================================= @@ -152,11 +144,6 @@ class XfstestsRunResult: Result object returned by run_test() to support parallel execution. Instead of raising immediately on failure, this allows callers to aggregate results from multiple parallel runs before deciding how to fail. - - For parallel execution, raw_message and data_disk are stored to enable - deferred notification sending after all workers complete. This prevents - deadlock when multiple workers try to send SubTestResult notifications - concurrently to the single-threaded notifier message manager. """ success: bool = True @@ -166,9 +153,6 @@ class XfstestsRunResult: fail_info: str = "" run_id: str = "" test_section: str = "" - # Fields for deferred notification support (parallel execution) - raw_message: str = "" # Raw xfstests output for sending notifications later - data_disk: str = "" # Data disk info for notification context def get_failure_message(self) -> str: """Generate a formatted failure message for this run.""" @@ -223,7 +207,6 @@ class XfstestsParallelRunner: try: batches = runner.split_tests(test_list) results = runner.run_parallel(batches, log_path, result, "cifs", 3600) - runner.send_deferred_notifications(results, result) runner.aggregate_results(results) finally: runner.cleanup_workers() @@ -393,31 +376,36 @@ def run_worker( tests: List[str], worker_path: PurePath, ) -> "XfstestsRunResult": - """Execute xfstests for a single worker.""" + """Execute xfstests for a single worker (execution only).""" run_id = f"{run_id_prefix}_{worker_id}" self.log.info( f"Worker {worker_id}: Starting {len(tests)} tests from {worker_path}" ) test_cases_str = " ".join(tests) - worker_result = self.xfstests.run_test( + + # Build the check command the same way run_test() does, + # but call _execute_test() directly - no result collection. + # Result collection happens in Phase 2 (sequential). + cmd = "" + if test_group: + cmd += f" -g {test_group}" + if test_section: + cmd += f" -s {test_section}" + cmd += " -E exclude.txt" + if test_cases_str: + cmd += f" {test_cases_str}" + console_log_name = f"xfstest_{run_id}.log" + cmd += f" > {console_log_name} 2>&1" + check_cmd = f"{worker_path}/check{cmd}" + + worker_result = self.xfstests._execute_test( + check_cmd=check_cmd, + working_path=worker_path, + run_id=run_id, test_section=test_section, - test_group=test_group, - log_path=log_path, - result=result, - test_cases=test_cases_str, timeout=worker_timeout, - run_id=run_id, - raise_on_failure=False, - xfstests_path=worker_path, - send_notifications=False, # Defer notifications to avoid deadlock - ) - # Log completion with result summary at INFO level for console visibility - status = "PASSED" if worker_result.success else "FAILED" - self.log.info( - f"Worker {worker_id}: {status} - " - f"{worker_result.total_count} tests, " - f"{worker_result.fail_count} failed" ) + self.log.info(f"Worker {worker_id}: Execution completed") return worker_result # Create task list for parallel execution @@ -431,12 +419,65 @@ def run_worker( f"{batch[:3]}{'...' if len(batch) > 3 else ''}" ) - # Execute all workers in parallel + # Phase 1: Execute all workers in parallel (no SSH-heavy result collection) self.log.info(f"Starting {len(tasks)} parallel xfstests workers...") - worker_results = run_in_parallel(tasks, log=self.log) - self.log.info("All parallel workers completed") + execution_results = list(run_in_parallel(tasks, log=self.log)) + self.log.info( + "All parallel workers completed execution. " + "Collecting results sequentially..." + ) - return worker_results + # Phase 2: Collect results sequentially to avoid SSH channel deadlock. + # check_test_results() makes many SSH calls (shell.exists, Cat.run, + # shell.copy_back) that deadlock when multiple threads compete for + # SSH channels. Running them one at a time is safe. + # Log names and worker paths are reconstructed from run_id and the + # runner's worker_paths list - no need to store them in the result. + final_results: List["XfstestsRunResult"] = [] + for idx, exec_result in enumerate(execution_results): + if not exec_result.success: + # Execution error occurred - skip result collection + self.log.warning( + f"[{exec_result.run_id}] Skipping result collection " + f"due to execution error: {exec_result.fail_info}" + ) + final_results.append(exec_result) + continue + + # Reconstruct paths from run_id and worker index + worker_path = self.worker_paths[idx] + console_log_name = f"xfstest_{exec_result.run_id}.log" + check_log_name = f"check_{exec_result.run_id}.log" + + # Collect results sequentially for this worker + try: + self.log.debug( + f"[{exec_result.run_id}] Collecting results from {worker_path}" + ) + final_result = self.xfstests.check_test_results( + log_path=log_path, + test_section=test_section or "generic", + result=result, + console_log_name=console_log_name, + check_log_name=check_log_name, + run_id=exec_result.run_id, + xfstests_path=worker_path, + ) + status = "PASSED" if final_result.success else "FAILED" + self.log.info( + f"Worker {exec_result.run_id}: {status} - " + f"{final_result.total_count} tests, " + f"{final_result.fail_count} failed" + ) + final_results.append(final_result) + except Exception as e: + self.log.error(f"[{exec_result.run_id}] Result collection failed: {e}") + exec_result.success = False + exec_result.fail_info = f"Result collection error: {e}" + final_results.append(exec_result) + + self.log.info(f"Result collection complete for {len(final_results)} workers") + return final_results def aggregate_results( self, @@ -501,48 +542,6 @@ def aggregate_results( return total_passed, total_failed, any_failures - def send_deferred_notifications( - self, - worker_results: List["XfstestsRunResult"], - result: "TestResult", - ) -> None: - """ - Send deferred SubTestResult notifications for all worker results. - - This method should be called AFTER run_parallel() completes and before - aggregate_results(). It sends all the SubTestResult messages that were - deferred during parallel execution to avoid deadlock in the notification - system. - - When workers run in parallel and each tries to send SubTestResult - notifications concurrently, the single-threaded message manager can - deadlock due to thread contention. By deferring notifications until - after parallel execution completes, we can send them sequentially - from the main thread. - - Args: - worker_results: List of results from run_parallel() - result: LISA TestResult object for subtest reporting - """ - self.log.info( - f"Sending deferred notifications for {len(worker_results)} workers..." - ) - - for worker_result in worker_results: - if worker_result.raw_message: - # We have deferred notification data - send it now - self.log.debug( - f"Sending deferred notification for {worker_result.run_id}" - ) - self.xfstests.create_send_subtest_msg( - test_result=result, - raw_message=worker_result.raw_message, - test_section=worker_result.run_id, # Use run_id as section - data_disk=worker_result.data_disk, - ) - - self.log.info("Deferred notifications sent successfully") - class Xfstests(Tool): """ @@ -753,11 +752,7 @@ def run_test( data_disk: str = "", test_cases: str = "", timeout: int = 14400, - run_id: str = "", - raise_on_failure: bool = True, - xfstests_path: Optional[PurePath] = None, - send_notifications: bool = True, - ) -> XfstestsRunResult: + ) -> None: """About: This method runs XFSTest on a given node with the specified test group and test cases Parameters: @@ -784,22 +779,7 @@ def run_test( test cases from different file systems, example xfs tests and generic tests. timeout(int): The time in seconds after which the test run will be timed out. Defaults to 4 hours. - run_id(str): (Optional)Unique identifier for this test run. Used to create - unique log filenames to support multiple concurrent xfstests instances. - If not provided, defaults to test_section or generates a random ID. - raise_on_failure(bool): (Optional)If True (default), raises LisaException when - tests fail. If False, returns XfstestsRunResult without raising, allowing - callers to aggregate results from multiple parallel runs before failing. - xfstests_path(PurePath): (Optional)Custom xfstests directory path. Used for - parallel worker execution where each worker needs its own directory copy - to avoid shared state conflicts. If not provided, uses the default - installation path from get_xfstests_path(). - Returns: - XfstestsRunResult: Object containing success status, failure counts, and - failure details. When raise_on_failure=True and tests fail, raises - LisaException instead of returning. Example: - # Traditional usage (raises on failure): xfstest.run_test( log_path=Path("/tmp/xfstests"), result=test_result, @@ -808,62 +788,78 @@ def run_test( data_disk="/dev/sdd", test_cases="generic/001 generic/002", timeout=14400, - run_id="ext4_run1", ) - - # Parallel execution usage (collect results, fail later): - result1 = xfstest.run_test(..., raise_on_failure=False) - result2 = xfstest.run_test(..., raise_on_failure=False) - if not result1.success or not result2.success: - combined = result1.get_failure_message() + result2.get_failure_message() - raise LisaException(combined) - - # Parallel execution with worker copies: - worker_path = xfstest.create_worker_copy(worker_id=1) - result = xfstest.run_test(..., xfstests_path=worker_path) """ # Note : the sequence is important here. # Do not rearrange !!!!! # Refer to xfstests-dev guide on https://github.com/kdave/xfstests - # Use custom path if provided, otherwise use default installation path - working_path = xfstests_path if xfstests_path else self.get_xfstests_path() - - # Generate unique run_id if not provided to support multiple concurrent runs. - # This creates unique log filenames preventing conflicts when multiple - # xfstests instances run on the same machine. - if not run_id: - run_id = test_section if test_section else generate_random_chars() - - # Use unique log filenames based on run_id to prevent conflicts - # when multiple xfstests instances run concurrently - console_log_name = f"xfstest_{run_id}.log" - check_log_name = f"check_{run_id}.log" - - # Build command line arguments for xfstests check script. - # Always include -E exclude.txt - xfstests handles missing/empty gracefully. - # This avoids SSH exists() check which blocks in parallel execution. + # Test if exclude.txt exists + xfstests_path = self.get_xfstests_path() + exclude_file_path = xfstests_path.joinpath("exclude.txt") + if self.node.shell.exists(exclude_file_path): + exclude_file = True + else: + exclude_file = False cmd = "" if test_group: cmd += f" -g {test_group}" if test_section: cmd += f" -s {test_section}" - cmd += " -E exclude.txt" + if exclude_file: + cmd += " -E exclude.txt" if test_cases: cmd += f" {test_cases}" - # Redirect output to unique log file based on run_id - cmd += f" > {console_log_name} 2>&1" + # Finally + cmd += " > xfstest.log 2>&1" - # Build the check command with proper path for worker directories. - # We use node.execute() directly instead of self.run() because self.run() - # always prepends self.command (the original installation path), which - # would run from the wrong directory when using worker copies. - # The check script must run from its own directory to find its configs. - check_cmd = f"{working_path}/check{cmd}" + # run ./check command + self.run_async( + cmd, + sudo=True, + shell=True, + force_run=True, + cwd=self.get_xfstests_path(), + ) + pgrep = self.node.tools[Pgrep] + # this is the actual process name, when xfstests runs. + # monitor till process completes or timesout + try: + pgrep.wait_processes("check", timeout=timeout) + finally: + run_result = self.check_test_results( + log_path=log_path, + test_section=test_section if test_section else "generic", + result=result, + data_disk=data_disk, + ) + if not run_result.success: + raise LisaException( + f"Fail {run_result.fail_count} cases of total " + f"{run_result.total_count}, " + f"\n\nfail cases: {run_result.fail_cases}, " + f"\n\ndetails: \n\n{run_result.fail_info}, \n\nplease investigate." + ) + + def _execute_test( + self, + check_cmd: str, + working_path: PurePath, + run_id: str, + test_section: str, + timeout: int, + ) -> XfstestsRunResult: + """Execute the xfstests check command and return a basic result. + + This is the execution-only portion: it runs the command and captures + whether execution itself succeeded. It does NOT collect/parse results + (that's check_test_results()). This separation allows run_parallel() + to execute all workers in parallel, then collect results sequentially + to avoid SSH channel deadlock. + """ run_result = XfstestsRunResult(run_id=run_id, test_section=test_section) try: - # Log the command being executed for debugging parallel execution self._log.debug( f"[{run_id}] Executing xfstests: {check_cmd[:100]}..." if len(check_cmd) > 100 @@ -879,39 +875,8 @@ def run_test( self._log.debug(f"[{run_id}] xfstests execution completed") except Exception as e: self._log.error(f"[{run_id}] xfstests execution failed: {e}") - raise - finally: - # Add random delay (1-5 seconds) to stagger check_test_results() calls - # when parallel workers complete around the same time. This prevents - # SSH connection pool contention that can cause indefinite blocking. - delay = random.uniform(1.0, 5.0) - self._log.debug( - f"[{run_id}] Waiting {delay:.1f}s before checking results..." - ) - time.sleep(delay) - self._log.debug(f"[{run_id}] Checking test results...") - run_result = self.check_test_results( - log_path=log_path, - test_section=test_section if test_section else "generic", - result=result, - data_disk=data_disk, - console_log_name=console_log_name, - check_log_name=check_log_name, - run_id=run_id, - xfstests_path=working_path, - send_notifications=send_notifications, - ) - - # Raise exception if tests failed and raise_on_failure is True - # This maintains backward compatibility with existing callers - if not run_result.success and raise_on_failure: - raise LisaException( - f"Fail {run_result.fail_count} cases of total " - f"{run_result.total_count}, " - f"\n\nfail cases: {run_result.fail_cases}, " - f"\n\ndetails: \n\n{run_result.fail_info}, \n\nplease investigate." - ) - + run_result.success = False + run_result.fail_info = f"Execution error: {e}" return run_result def _initialize(self, *args: Any, **kwargs: Any) -> None: @@ -1168,15 +1133,23 @@ def create_worker_copy( self._log.debug(f"Creating worker {worker_id} xfstests copy at {worker_path}") - # Remove existing directory if present - self.node.execute(f"rm -rf {worker_path}", sudo=True) - - # Create directory and copy xfstests - # Using cp -a to preserve permissions and symlinks - self.node.execute(f"mkdir -p {base_dir}", sudo=True) + # Combine all setup commands into a single SSH session to minimize + # concurrent SSH channel usage. When create_workers() runs all workers + # in parallel via run_in_parallel(), each separate node.execute() call + # opens a new SSH channel. With 8 workers × 4 commands = 32 simultaneous + # channels, this exceeds the SSH server's MaxSessions limit (default 10), + # causing ChannelException(2, 'Connect failed'). + # By combining into one command, each worker uses only 1 channel. + combined_cmd = ( + f"rm -rf {worker_path} && " + f"mkdir -p {base_dir} && " + f"cp -a {source_path} {worker_path} && " + f"chmod -R a+rwx {worker_path}" + ) result = self.node.execute( - f"cp -a {source_path} {worker_path}", + combined_cmd, sudo=True, + shell=True, timeout=300, # Copy can take time for large directories ) if result.exit_code != 0: @@ -1184,9 +1157,6 @@ def create_worker_copy( f"Failed to create worker {worker_id} copy: {result.stderr}" ) - # Ensure proper permissions for the worker directory - self.node.execute(f"chmod -R a+rwx {worker_path}", sudo=True) - self._log.debug(f"Worker {worker_id} xfstests copy created at {worker_path}") return worker_path @@ -1349,6 +1319,7 @@ def create_send_subtest_msg( raw_message: str, test_section: str, data_disk: str, + xfstests_path: Optional[PurePath] = None, ) -> None: """ About:This method is internal to LISA and is not intended for direct calls. @@ -1358,6 +1329,8 @@ def create_send_subtest_msg( raw_message: The raw message from the xfstests output test_section: The test group name used for testing data_disk: The data disk used for testing. ( method is partially implemented ) + xfstests_path: Optional custom xfstests directory path. Used for parallel + worker execution. If not provided, uses get_xfstests_path(). """ all_cases_match = self.__all_cases_pattern.match(raw_message) if not all_cases_match: @@ -1417,7 +1390,10 @@ def create_send_subtest_msg( info["information"]["data_disk"] = data_disk info["information"]["test_details"] = str( self.create_xfstest_stack_info( - result.name, test_section, str(result.status.name) + result.name, + test_section, + str(result.status.name), + xfstests_path=xfstests_path, ) ) # Parse actual test duration from xfstests output (e.g., "46s", "302s") @@ -1480,7 +1456,6 @@ def check_test_results( check_log_name: str = "check.log", run_id: str = "", xfstests_path: Optional[PurePath] = None, - send_notifications: bool = True, ) -> XfstestsRunResult: """ About: This method is intended to be called by run_test method only. @@ -1502,9 +1477,6 @@ def check_test_results( run_id: Unique identifier for this test run (used in result object) xfstests_path: Optional custom xfstests directory path for worker execution. If not provided, uses the default path from get_xfstests_path(). - send_notifications: If True (default), send SubTestResult notifications - immediately. If False, store raw_message in the result for deferred - notification sending (used for parallel execution to avoid deadlock). Returns: XfstestsRunResult: Object containing success status and failure details. """ @@ -1536,17 +1508,13 @@ def check_test_results( log_result.assert_exit_code() ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") raw_message = ansi_escape.sub("", log_result.stdout) - if send_notifications: - self.create_send_subtest_msg( - test_result=result, - raw_message=raw_message, - test_section=test_section, - data_disk=data_disk, - ) - else: - # Store for deferred notification (parallel execution) - run_result.raw_message = raw_message - run_result.data_disk = data_disk + self.create_send_subtest_msg( + test_result=result, + raw_message=raw_message, + test_section=test_section, + data_disk=data_disk, + xfstests_path=working_path, + ) # Use _file_exists_with_timeout instead of shell.exists() to avoid # indefinite blocking in parallel execution scenarios @@ -1733,6 +1701,7 @@ def create_xfstest_stack_info( case: str, test_section: str, test_status: str, + xfstests_path: Optional[PurePath] = None, ) -> str: """ About:This method is used to look up the xfstests results directory and extract @@ -1742,6 +1711,9 @@ def create_xfstest_stack_info( case: The test case name for which the stack info is needed test_section: The test group name used for testing test_status: The test status for the given test case + xfstests_path: Optional custom xfstests directory path. Used for parallel + worker execution where results are in worker-specific directories. + If not provided, uses get_xfstests_path() (default installation path). Returns: The method returns the stack info message for the given test case Example: @@ -1758,10 +1730,11 @@ def create_xfstest_stack_info( """ # Get XFSTest current path. we are looking at results/{test_type} directory here - xfstests_path = self.get_xfstests_path() + # Use provided path for worker execution, or default installation path + working_path = xfstests_path if xfstests_path else self.get_xfstests_path() test_class = case.split("/")[0] test_id = case.split("/")[1] - result_path = xfstests_path / f"results/{test_section}/{test_class}" + result_path = working_path / f"results/{test_section}/{test_class}" cat_tool = self.node.tools[Cat] result = "" # note: ls tool is not used here due to performance issues. diff --git a/lisa/sut_orchestrator/azure/common.py b/lisa/sut_orchestrator/azure/common.py index cb4692ea46..55d978e315 100644 --- a/lisa/sut_orchestrator/azure/common.py +++ b/lisa/sut_orchestrator/azure/common.py @@ -29,7 +29,7 @@ import requests from assertpy import assert_that from azure.core.credentials import AccessToken, TokenCredential -from azure.core.exceptions import ResourceExistsError +from azure.core.exceptions import HttpResponseError, ResourceExistsError from azure.keyvault.certificates import ( CertificateClient, CertificatePolicy, @@ -2388,7 +2388,34 @@ def get_or_create_file_share( log.debug( f" provisioned_bandwidth_mibps: {provisioned_bandwidth_mibps}" ) - share_service_client.create_share(file_share_name, **create_kwargs) + + try: + share_service_client.create_share(file_share_name, **create_kwargs) + except HttpResponseError as e: + # Handle PV2 parameters unsupported by the storage account + if "UnsupportedHeader" in str(e) and ( + "x-ms-share-provisioned-iops" in str(e) + or "x-ms-share-provisioned-bandwidth" in str(e) + ): + log.warning( + "PV2 parameters not supported by storage account, " + "retrying without provisioned_iops/provisioned_bandwidth_mibps" + ) + + # Remove PV2-specific parameters and retry + create_kwargs.pop("provisioned_iops", None) + create_kwargs.pop("provisioned_bandwidth_mibps", None) + + # For non-PV2 premium shares, minimum quota is 100 GiB + if create_kwargs.get("quota", 0) < 100: + log.warning( + f"Increasing quota from {create_kwargs.get('quota')} " + "to 100 GiB (minimum for non-PV2 premium file shares)" + ) + create_kwargs["quota"] = 100 + share_service_client.create_share(file_share_name, **create_kwargs) + else: + raise return str("//" + share_service_client.primary_hostname + "/" + file_share_name) diff --git a/lisa/tools/git.py b/lisa/tools/git.py index 7d9b0e8ccd..65bb6c1ef9 100644 --- a/lisa/tools/git.py +++ b/lisa/tools/git.py @@ -614,7 +614,7 @@ def worktree_list( def worktree_remove( self, cwd: pathlib.PurePath, - path: pathlib.PurePath, + path: str, force: bool = False, ) -> None: cmd = "worktree remove" diff --git a/lisa/transformers/kernel_source_installer.py b/lisa/transformers/kernel_source_installer.py index 04c95ac26d..b746522ad5 100644 --- a/lisa/transformers/kernel_source_installer.py +++ b/lisa/transformers/kernel_source_installer.py @@ -462,12 +462,12 @@ def _build_code( make_args = "" if use_ccache: - make_args = "CC='ccache gcc'" - node.execute( - cmd=f"export CCACHE_DIR={str(code_path.parent)}/.ccache", - shell=True, - no_error_log=True, - ) + ccache_dir = code_path.parent / ".ccache" + if not node.shell.exists(ccache_dir): + node.execute(f"mkdir -p {ccache_dir}", sudo=True) + node.execute(f"chmod 0777 {ccache_dir}", sudo=True) + + make_args = f"CC='ccache gcc' CCACHE_DIR={str(ccache_dir)}" # set timeout to 2 hours make.make(arguments=make_args, cwd=code_path, timeout=60 * 60 * 2) diff --git a/lisa/transformers/kernel_source_packager.py b/lisa/transformers/kernel_source_packager.py index df8cbabc05..79fe88d4ab 100644 --- a/lisa/transformers/kernel_source_packager.py +++ b/lisa/transformers/kernel_source_packager.py @@ -32,7 +32,6 @@ class RepoWorktreeSchema(RepoLocationSchema): worktree_name: str = "" worktree_repo: str = "" worktree_ref: str = "" - worktree_local_branch: str = "" @dataclass_json() @@ -388,6 +387,131 @@ def type_name(cls) -> str: def type_schema(cls) -> Type[schema.TypedSchema]: return RepoWorktreeSchema + def _cleanup_detached_worktrees( + self, + code_path: PurePath, + ) -> None: + git = self._node.tools[Git] + + # Remove detached worktrees and their associated branches. + worktrees = git.worktree_list(cwd=code_path) + detached_worktrees = [ + wt["path"] for wt in worktrees if wt.get("branch") == "(detached)" + ] + + for worktree in detached_worktrees: + try: + git.worktree_remove(cwd=code_path, path=worktree) + # Also deleting branches created by the worktree for coherence. + # This doesn't work for mainline in + # /mnt/code/linux pointed by origin. + # Use basename since worktree is a full path from worktree_list. + worktree_name = PurePath(worktree).name + self._node.execute( + f"git branch --list '{worktree_name}-*' | xargs git branch -D", + shell=True, + no_error_log=True, + cwd=code_path, + ) + except Exception as e: + self._log.debug( + "failed to cleanup detached worktree and branches " + f"'{worktree}': {e}" + ) + + git.worktree_prune(cwd=code_path) + + def _is_tag( + self, + target_path: PurePath, + ref: str, + ) -> bool: + result = self._node.execute( + f"git tag -l {ref}", + shell=True, + cwd=target_path, + ) + return bool(result.stdout.strip()) + + def _checkout_target_ref( + self, + target_path: PurePath, + target_ref: str, + remote: str, + ) -> None: + git = self._node.tools[Git] + + # Tags are not namespaced under remotes (there is no remote/tag-name). + # Detect tags and use the ref directly instead of remote/ref. + is_tag = self._is_tag(target_path, target_ref) + if is_tag: + remote_ref = target_ref + self._log.debug( + f"'{target_ref}' is a tag, using it directly instead of " + f"'{remote}/{target_ref}'." + ) + else: + remote_ref = f"{remote}/{target_ref}" + + # Checkout and update the target ref, with force-checkout fallback. + expected_local_branch_name = f"{remote}-{target_ref}" + try: + local_branches_result = self._node.execute( + "git branch --format='%(refname:short)'", + shell=True, + cwd=target_path, + ) + local_branches = [ + b.strip().strip("'") + for b in local_branches_result.stdout.splitlines() + if b.strip() + ] + if expected_local_branch_name in local_branches: + self._log.debug("Pulling upstream in an existing branch.") + self._node.execute( + f"git checkout -f {expected_local_branch_name}", + shell=True, + cwd=target_path, + expected_exit_code=0, + ) + if not is_tag: + git.pull(cwd=target_path) + elif target_ref in local_branches: + self._log.debug("Pulling upstream in an existing branch.") + self._node.execute( + f"git checkout -f {target_ref}", + shell=True, + cwd=target_path, + expected_exit_code=0, + ) + if not is_tag: + git.pull(cwd=target_path) + else: + self._log.debug("Checking out a new branch synced with remote.") + self._node.execute( + f"git checkout -b {expected_local_branch_name} {remote_ref}", + shell=True, + cwd=target_path, + expected_exit_code=0, + ) + + self._log.info( + f"checkout code from: '{target_ref}', in " + f"'{git.get_current_branch(cwd=target_path)}'" + ) + except Exception: + self._log.debug("Checking out a new branch force synced with remote") + self._node.execute( + f"git checkout -B {expected_local_branch_name} {remote_ref}", + shell=True, + cwd=target_path, + expected_exit_code=0, + ) + self._log.info( + f"checkout code from: '{target_ref}', in " + f"'{git.get_current_branch(cwd=target_path)}'" + ) + def get_source_code(self) -> PurePath: runbook: RepoWorktreeSchema = cast(RepoWorktreeSchema, self.runbook) @@ -418,31 +542,42 @@ def get_source_code(self) -> PurePath: else: code_path = code_path / repo_name - # check if the 'repo' is already a remote url - remote_exists = False - remote = "" + remote = "origin" remotes = git.remote_list(code_path) self._log.debug(f"existing remotes: {remotes}") - for remote in remotes: - if runbook.worktree_repo == git.remote_get_url(code_path, remote): - remote_exists = True - break - - if not remote_exists: - remote = runbook.worktree_name - self._log.info(f"adding remote {remote} for {runbook.worktree_repo}") - git.remote_add(cwd=code_path, name=remote, url=runbook.worktree_repo) - git.fetch( + if runbook.worktree_name: + if runbook.worktree_name in remotes: + self._log.debug("Setting the remote based on worktree name/path.") + remote = runbook.worktree_name + if runbook.worktree_repo: + assert runbook.worktree_repo == git.remote_get_url( + code_path, remote + ), f"Existing remote url doesn't match with {runbook.worktree_repo}" + elif runbook.worktree_name == repo_name: + self._log.debug("Using the upstream repo pointed by 'origin' remote") + + else: + assert runbook.worktree_repo, "Remote can not be added without a URL" + remote = runbook.worktree_name + self._log.info(f"adding remote {remote} for {runbook.worktree_repo}") + git.remote_add(cwd=code_path, name=remote, url=runbook.worktree_repo) + + self._node.execute( + f"git fetch -p {remote} --force --tags", + shell=True, + no_info_log=False, cwd=code_path, - remote=remote, + expected_exit_code=0, ) target_path = code_path target_ref = runbook.ref if runbook.worktree_name: + self._cleanup_detached_worktrees(code_path) + worktree_path = code_path.parent / runbook.worktree_name - git.worktree_prune(cwd=code_path) if not git.worktree_exists(cwd=code_path, path=str(worktree_path)): + assert runbook.worktree_ref, "Worktree ref needs to be set by user" self._log.info( f"creating a new worktree at {worktree_path} " f"pointing at {remote}/{runbook.worktree_ref}" @@ -452,7 +587,7 @@ def get_source_code(self) -> PurePath: path=worktree_path, remote=remote, remote_ref=runbook.worktree_ref, - new_branch=runbook.worktree_local_branch, + new_branch=f"{remote}-{runbook.worktree_ref}", track=True, ) @@ -460,16 +595,14 @@ def get_source_code(self) -> PurePath: self._log.info(f"Kernel HEAD is now at : {latest_commit_id}") return worktree_path - # worktree exists - target_ref = runbook.worktree_ref + # worktree exists — fetch the tracking remote and update + self._log.debug("Using the existing worktree") + if runbook.worktree_ref: + target_ref = runbook.worktree_ref target_path = worktree_path if target_ref: - if git.get_current_branch(cwd=target_path) == target_ref: - git.pull(cwd=target_path) - - git.checkout(ref=target_ref, cwd=target_path) - self._log.info(f"checkout code from: '{target_ref}'") + self._checkout_target_ref(target_path, target_ref, remote) latest_commit_id = git.get_latest_commit_id(cwd=target_path) self._log.info(f"Kernel HEAD is now at : {latest_commit_id}") diff --git a/pyproject.toml b/pyproject.toml index 47e1cc03ce..dbce01c291 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ azure = [ "azure-mgmt-serialconsole ~= 1.0.0", "azure-mgmt-storage ~= 21.2.1", "azure-storage-blob ~= 12.23.0", - "azure-storage-file-share ~= 12.20.0", + "azure-storage-file-share ~= 12.24.0", "azure-keyvault-secrets ~= 4.7.0", "azure-keyvault-certificates ~= 4.7.0", "msrestazure ~= 0.6.4",