diff --git a/bugwarrior/collect.py b/bugwarrior/collect.py index 42121c5d..bcccb8f0 100644 --- a/bugwarrior/collect.py +++ b/bugwarrior/collect.py @@ -1,17 +1,18 @@ +from collections.abc import Iterator import copy from functools import cache from importlib.metadata import entry_points import logging import multiprocessing import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from jinja2 import Template from taskw.task import Task if TYPE_CHECKING: from bugwarrior.config.validation import Config - from bugwarrior.services import Service + from bugwarrior.services import Issue, Service log = logging.getLogger(__name__) @@ -47,7 +48,7 @@ def get_service_instances(conf: "Config") -> list["Service"]: ] -def _aggregate_issues(service: "Service", queue: multiprocessing.Queue): +def _aggregate_issues(service: "Service", queue: multiprocessing.Queue) -> None: """This worker function is separated out from the main :func:`aggregate_issues` func only so that we can use multiprocessing on it for speed reasons. @@ -80,7 +81,7 @@ def _aggregate_issues(service: "Service", queue: multiprocessing.Queue): log.info(f"Done with [{target}] in {duration}.") -def aggregate_issues(conf: "Config", debug: bool): +def aggregate_issues(conf: "Config", debug: bool) -> Iterator[dict | tuple[str, str]]: """Return all issues from every target.""" log.info("Starting to aggregate remote issues.") @@ -129,10 +130,10 @@ def aggregate_issues(conf: "Config", debug: bool): class TaskConstructor: """Construct a taskwarrior task from a foreign record.""" - def __init__(self, issue): + def __init__(self, issue: "Issue") -> None: self.issue = issue - def get_added_tags(self): + def get_added_tags(self) -> list[str]: added_tags = [] for tag in self.issue.config.add_tags: tag = Template(tag).render(self.get_template_context()) @@ -141,7 +142,7 @@ def get_added_tags(self): return added_tags - def get_taskwarrior_record(self, refined=True) -> dict: + def get_taskwarrior_record(self, refined: bool = True) -> dict[str, Any]: if not getattr(self, '_taskwarrior_record', None): self._taskwarrior_record = self.issue.to_taskwarrior() record = copy.deepcopy(self._taskwarrior_record) @@ -153,13 +154,13 @@ def get_taskwarrior_record(self, refined=True) -> dict: record['tags'].extend(self.get_added_tags()) return record - def get_template_context(self): + def get_template_context(self) -> dict[str, Any]: context = self.get_taskwarrior_record(refined=False).copy() context.update(self.issue.extra) context.update({'description': self.issue.get_default_description()}) return context - def refine_record(self, record): + def refine_record(self, record: dict[str, Any]) -> dict[str, Any]: for field in Task.FIELDS.keys(): if field in self.issue.config.templates: template = Template(self.issue.config.templates[field]) diff --git a/bugwarrior/command.py b/bugwarrior/command.py index ab47a398..a66fc993 100644 --- a/bugwarrior/command.py +++ b/bugwarrior/command.py @@ -1,9 +1,10 @@ +from collections.abc import Callable, Iterator import functools import getpass import logging import os import sys -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import click from lockfile import LockTimeout @@ -22,7 +23,7 @@ lst = list -def _get_section_name(flavor): +def _get_section_name(flavor: str | None) -> str: if flavor: return 'flavor.' + flavor return 'general' @@ -46,10 +47,14 @@ def _try_load_config( sys.exit(1) -def _legacy_cli_deprecation_warning(subcommand_callback): +def _legacy_cli_deprecation_warning( + subcommand_callback: Callable[..., Any], +) -> Callable[..., Any]: @functools.wraps(subcommand_callback) @click.pass_context - def wrapped_subcommand_callback(ctx, *args, **kwargs): + def wrapped_subcommand_callback( + ctx: click.Context, *args: Any, **kwargs: Any + ) -> Any: if ctx.find_root().command_path != 'bugwarrior': old_command = ctx.command_path new_command = ctx.command_path.replace('-', ' ') @@ -71,16 +76,18 @@ class AliasedCli(click.Group): with the old cli api. """ - def list_commands(self, ctx): - return ctx.command.commands.keys() + def list_commands(self, ctx: click.Context) -> list[str]: + assert isinstance(ctx.command, click.Group) + return list(ctx.command.commands) - def get_command(self, ctx, cmd_name): - return ctx.command.commands[cmd_name] + def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: + assert isinstance(ctx.command, click.Group) + return ctx.command.commands.get(cmd_name) @click.command(cls=AliasedCli) @click.version_option() -def cli(): +def cli() -> None: pass @@ -93,7 +100,9 @@ def cli(): ) @click.option('--quiet', is_flag=True, help='Set logging level to WARNING.') @_legacy_cli_deprecation_warning -def pull(dry_run, flavor, interactive, debug, quiet): +def pull( + dry_run: bool, flavor: str | None, interactive: bool, debug: bool, quiet: bool +) -> None: """Pull down tasks from forges and add them to your taskwarrior tasks. Relies on configuration file. @@ -128,7 +137,7 @@ def pull(dry_run, flavor, interactive, debug, quiet): @cli.group() @_legacy_cli_deprecation_warning -def vault(): +def vault() -> None: """Password/keyring management for bugwarrior. If you use the keyring password oracle in your bugwarrior config, this tool @@ -138,7 +147,7 @@ def vault(): pass -def targets(): +def targets() -> Iterator[str]: config = _try_load_config('general') for service_config in config.service_configs: for value in dict(service_config).values(): @@ -149,7 +158,7 @@ def targets(): @vault.command() -def list(): +def list() -> None: pws = lst(targets()) print("%i @oracle:use_keyring passwords in bugwarriorrc" % len(pws)) for section in pws: @@ -159,7 +168,7 @@ def list(): @vault.command() @click.argument('target') @click.argument('username') -def clear(target, username): +def clear(target: str, username: str) -> None: target_list = lst(targets()) if target not in target_list: raise ValueError("%s must be one of %r" % (target, target_list)) @@ -175,7 +184,7 @@ def clear(target, username): @vault.command() @click.argument('target') @click.argument('username') -def set(target, username): +def set(target: str, username: str) -> None: target_list = lst(targets()) if target not in target_list: log.warning( @@ -192,7 +201,7 @@ def set(target, username): @cli.command() @click.option('--flavor', default=None, help='The flavor to use') @_legacy_cli_deprecation_warning -def uda(flavor): +def uda(flavor: str | None) -> None: """ List bugwarrior-managed uda's. @@ -226,7 +235,7 @@ def uda(flavor): @click.argument( 'rcfile', required=False, default=get_config_path(), type=click.Path(exists=True) ) -def ini2toml(rcfile): +def ini2toml(rcfile: str) -> None: """Convert ini bugwarriorrc to toml and print result to stdout.""" try: from ini2toml.api import Translator diff --git a/bugwarrior/config/__init__.py b/bugwarrior/config/__init__.py index 489829b4..83fbdbe6 100644 --- a/bugwarrior/config/__init__.py +++ b/bugwarrior/config/__init__.py @@ -10,6 +10,7 @@ ExpandedPath, # noqa: F401 MainSectionConfig, NoSchemeUrl, # noqa: F401 + Priority, # noqa: F401 ServiceConfig, StrippedTrailingSlashUrl, # noqa: F401 TaskrcPath, # noqa: F401 diff --git a/bugwarrior/config/data.py b/bugwarrior/config/data.py index f016974a..c9b30682 100644 --- a/bugwarrior/config/data.py +++ b/bugwarrior/config/data.py @@ -1,19 +1,20 @@ import json import os +from pathlib import Path import subprocess import typing from lockfile.pidlockfile import PIDLockFile -def get_data_path(taskrc): +def get_data_path(taskrc: str | Path) -> str: # We cannot use the taskw module here because it doesn't really support # the `_` subcommands properly (`rc:` can't be used for them). line_prefix = 'data.location=' # Take a copy of the environment and add our taskrc to it. env = dict(os.environ) - env['TASKRC'] = taskrc + env['TASKRC'] = str(taskrc) tw_show = subprocess.Popen(('task', '_show'), stdout=subprocess.PIPE, env=env) data_location = subprocess.check_output( @@ -36,7 +37,7 @@ class BugwarriorData: key-value store. """ - def __init__(self, data_path): + def __init__(self, data_path: str) -> None: self._datafile = os.path.join(data_path, 'bugwarrior.data') self._lockfile = os.path.join(data_path, 'bugwarrior-data.lockfile') #: Taskwarrior's ``data.location`` configuration value. If necessary, @@ -44,23 +45,25 @@ def __init__(self, data_path): self.path = data_path @classmethod - def __get_pydantic_json_schema__(cls, core_schema, handler): + def __get_pydantic_json_schema__( + cls, core_schema: typing.Any, handler: typing.Any + ) -> dict[str, str]: """Fix schema generation in pydantic v2.""" return {"type": "object", "description": "Local data storage"} - def get_data(self) -> dict: + def get_data(self) -> dict[str, typing.Any]: """Return all data from the ``bugwarrior.data`` file.""" with open(self._datafile) as jsondata: return json.load(jsondata) - def get(self, key) -> typing.Any: + def get(self, key: str) -> typing.Any: """Return a value stored in the ``bugwarrior.data`` file.""" try: return self.get_data()[key] except OSError: # File does not exist. return None - def set(self, key, value): + def set(self, key: str, value: typing.Any) -> None: """Set a value in the ``bugwarrior.data`` file.""" with PIDLockFile(self._lockfile): try: diff --git a/bugwarrior/config/ini2toml_plugin.py b/bugwarrior/config/ini2toml_plugin.py index dc2a9f25..e9821eef 100644 --- a/bugwarrior/config/ini2toml_plugin.py +++ b/bugwarrior/config/ini2toml_plugin.py @@ -18,7 +18,7 @@ log = logging.getLogger(__name__) -def to_type(section: IntermediateRepr, key: str, converter: typing.Callable): +def to_type(section: IntermediateRepr, key: str, converter: typing.Callable) -> None: try: val = section[key] except KeyError: @@ -27,15 +27,15 @@ def to_type(section: IntermediateRepr, key: str, converter: typing.Callable): section[key] = converter(val) -def to_bool(section: IntermediateRepr, key: str): +def to_bool(section: IntermediateRepr, key: str) -> None: to_type(section, key, pydantic.TypeAdapter(bool).validate_python) -def to_int(section: IntermediateRepr, key: str): +def to_int(section: IntermediateRepr, key: str) -> None: to_type(section, key, int) -def to_list(section: IntermediateRepr, key: str): +def to_list(section: IntermediateRepr, key: str) -> None: to_type(section, key, parse_config_list) @@ -53,7 +53,9 @@ def get_field_type(attrs: dict) -> typing.Optional[str]: return None -def convert_section(section: IntermediateRepr, schema: type[pydantic.BaseModel]): +def convert_section( + section: IntermediateRepr, schema: type[pydantic.BaseModel] +) -> None: for prop, attrs in schema.model_json_schema()['properties'].items(): field_type = get_field_type(attrs) if field_type == 'boolean': @@ -129,7 +131,7 @@ def unquote_flavors(file_contents: str) -> str: ) -def activate(translator: Translator): +def activate(translator: Translator) -> None: profile = translator["bugwarriorrc"] profile.help_text = "Convert 'bugwarriorrc' files to 'bugwarrior.toml'" profile.intermediate_processors.append(process_values) diff --git a/bugwarrior/config/load.py b/bugwarrior/config/load.py index e7454c1d..04b22796 100644 --- a/bugwarrior/config/load.py +++ b/bugwarrior/config/load.py @@ -17,7 +17,7 @@ BUGWARRIORRC = "BUGWARRIORRC" -def configure_logging(logfile, loglevel): +def configure_logging(logfile: str | Path | None, loglevel: str) -> None: logging.basicConfig(filename=logfile, level=loglevel) # In general, its nice to log "everything", but some of the loggers from @@ -32,7 +32,7 @@ def configure_logging(logfile, loglevel): logging.getLogger(spammer).setLevel(logging.WARNING) -def get_config_path(): +def get_config_path() -> str: """Determine path to config file. See docs/manpage.rst for precedence.""" if os.environ.get(BUGWARRIORRC): return os.environ[BUGWARRIORRC] @@ -66,12 +66,12 @@ def format_config(config: dict) -> dict[str, Any]: return config -def parse_toml_file(configpath: str) -> dict: +def parse_toml_file(configpath: str) -> dict[str, Any]: with open(configpath, 'rb') as file: return tomllib.load(file) -def parse_ini_file(configpath: str) -> dict: +def parse_ini_file(configpath: str) -> dict[str, Any]: rawconfig = BugwarriorConfigParser() with open(configpath, encoding="utf-8") as buff: rawconfig.read_file(buff) @@ -109,7 +109,7 @@ def parse_ini_file(configpath: str) -> dict: return config -def parse_file(configpath: str) -> dict: +def parse_file(configpath: str) -> dict[str, Any]: if Path(configpath).suffix == '.toml': config = parse_toml_file(configpath) else: @@ -117,7 +117,7 @@ def parse_file(configpath: str) -> dict: return format_config(config) -def load_config(main_section, interactive, quiet) -> Config: +def load_config(main_section: str, interactive: bool, quiet: bool) -> Config: configpath = get_config_path() rawconfig = parse_file(configpath) for flavor in rawconfig['flavor'].values(): @@ -131,12 +131,12 @@ def load_config(main_section, interactive, quiet) -> Config: # ConfigParser is not a new-style class, so inherit from object to fix super(). class BugwarriorConfigParser(configparser.ConfigParser): - def __init__(self, *args, allow_no_value=True, **kwargs): + def __init__(self, *args: Any, allow_no_value: bool = True, **kwargs: Any) -> None: super().__init__( *args, allow_no_value=allow_no_value, interpolation=None, **kwargs ) - def getint(self, section, option, **kwargs): + def getint(self, section: str, option: str, **kwargs: Any) -> int | None: # ty: ignore[invalid-method-override] """Accepts both integers and empty values.""" try: return super().getint(section, option, **kwargs) @@ -150,6 +150,6 @@ def getint(self, section, option, **kwargs): ) ) - def optionxform(self, optionstr): + def optionxform(self, optionstr: str) -> str: """Do not lowercase key names.""" return optionstr diff --git a/bugwarrior/config/schema.py b/bugwarrior/config/schema.py index 4f9f20f8..22ab5c7c 100644 --- a/bugwarrior/config/schema.py +++ b/bugwarrior/config/schema.py @@ -3,7 +3,7 @@ from pathlib import Path import re import typing -from typing import Annotated, Literal +from typing import Annotated, Any, Literal import pydantic from pydantic import ( @@ -25,8 +25,10 @@ log = logging.getLogger(__name__) +Priority = Literal['', 'L', 'M', 'H'] -def validate_url(url: str): + +def validate_url(url: str) -> str: return str(AnyUrl(url)).rstrip("/") @@ -201,18 +203,18 @@ class ServiceConfig(_ServiceConfig): target: str # Added during validation (computed field) - templates: dict = {} + templates: dict[str, str] = {} # Optional fields shared by all services. only_if_assigned: str = "" also_unassigned: bool = False - default_priority: Literal["", "L", "M", "H"] = "M" + default_priority: Priority = "M" add_tags: ConfigList = [] static_fields: ConfigList = [] @model_validator(mode="before") @classmethod - def compute_templates(cls, values): + def compute_templates(cls, values: dict[str, Any]) -> dict[str, Any]: """Get any defined templates for configuration values. Users can override the value of any Taskwarrior field using @@ -249,7 +251,9 @@ def compute_templates(cls, values): @field_validator('include_merge_requests', mode='after', check_fields=False) @classmethod - def deprecate_filter_merge_requests(cls, value, info: ValidationInfo): + def deprecate_filter_merge_requests( + cls, value: bool | str, info: ValidationInfo + ) -> bool | str: if not hasattr(cls, '_DEPRECATE_FILTER_MERGE_REQUESTS'): return value @@ -269,7 +273,7 @@ def deprecate_filter_merge_requests(cls, value, info: ValidationInfo): @field_validator('project_name', mode='after', check_fields=False) @classmethod - def deprecate_project_name(cls, value): + def deprecate_project_name(cls, value: str) -> str: if hasattr(cls, '_DEPRECATE_PROJECT_NAME'): if value != '': log.warning('project_name is deprecated in favor of project_template') diff --git a/bugwarrior/config/secrets.py b/bugwarrior/config/secrets.py index 947f00e6..97813ec3 100644 --- a/bugwarrior/config/secrets.py +++ b/bugwarrior/config/secrets.py @@ -1,11 +1,12 @@ import logging import subprocess import sys +from types import ModuleType log = logging.getLogger(__name__) -def get_keyring(): +def get_keyring() -> ModuleType: """Try to import and return optional keyring dependency.""" try: import keyring @@ -17,7 +18,9 @@ def get_keyring(): return keyring -def get_service_password(service, username, oracle=None, interactive=False): +def get_service_password( + service: str, username: str, oracle: str | None = None, interactive: bool = False +) -> str: """ Retrieve the sensitive password for a service by: @@ -70,7 +73,7 @@ def get_service_password(service, username, oracle=None, interactive=False): return password -def oracle_eval(command): +def oracle_eval(command: str) -> str: """Retrieve password from the given command""" p = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE diff --git a/bugwarrior/config/validation.py b/bugwarrior/config/validation.py index 781121ea..5c827b81 100644 --- a/bugwarrior/config/validation.py +++ b/bugwarrior/config/validation.py @@ -107,7 +107,9 @@ def _format_extra_section_error(error: ErrorDetails | dict) -> str: return _format_field_error(section_name, loc[1], msg, error) -def raise_validation_error(msg, config_path, error_count=1) -> NoReturn: +def raise_validation_error( + msg: str, config_path: str, error_count: int = 1 +) -> NoReturn: log.error( ("Validation error" if error_count == 1 else f"{error_count} validation errors") + f" found in {config_path}\n" @@ -116,7 +118,7 @@ def raise_validation_error(msg, config_path, error_count=1) -> NoReturn: sys.exit(1) -def get_service_config_union_type(services: list[dict[str, Any]]): +def get_service_config_union_type(services: list[dict[str, Any]]) -> Any: """ Return a Union type of the ServiceConfig subclasses of the services actually configured. diff --git a/bugwarrior/db.py b/bugwarrior/db.py index 473dcfa5..9477d4da 100644 --- a/bugwarrior/db.py +++ b/bugwarrior/db.py @@ -1,10 +1,10 @@ -from collections.abc import Iterable +from collections.abc import Iterable, Iterator import itertools import json import logging import re import subprocess -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from taskw import TaskWarriorShellout from taskw.exceptions import TaskwarriorError @@ -26,11 +26,11 @@ class MultipleMatches(Exception): pass -def get_normalized_annotation(annotation): +def get_normalized_annotation(annotation: str) -> str: return re.sub(r'[\W_]', '', str(annotation)) -def get_annotation_hamming_distance(left, right): +def get_annotation_hamming_distance(left: str, right: str) -> int: left = get_normalized_annotation(left) right = get_normalized_annotation(right) if len(left) > len(right): @@ -40,7 +40,7 @@ def get_annotation_hamming_distance(left, right): return hamdist(left, right) -def hamdist(str1, str2): +def hamdist(str1: str, str2: str) -> int: """Count the # of differences between equal length strings str1 and str2""" diffs = 0 for ch1, ch2 in zip(str1, str2): @@ -49,7 +49,9 @@ def hamdist(str1, str2): return diffs -def get_managed_task_uuids(tw, key_list): +def get_managed_task_uuids( + tw: TaskWarriorShellout, key_list: dict[str, list[str]] +) -> set[str]: expected_task_ids = set() for keys in key_list.values(): tasks = tw.filter_tasks( @@ -63,7 +65,7 @@ def get_managed_task_uuids(tw, key_list): return expected_task_ids -def make_unique_identifier(keys: dict, issue: dict) -> str: +def make_unique_identifier(keys: dict[str, list[str]], issue: dict[str, Any]) -> str: """For a given issue, make an identifier from its unique keys. This is not the same as the taskwarrior uuid, which is assigned @@ -76,7 +78,9 @@ def make_unique_identifier(keys: dict, issue: dict) -> str: raise RuntimeError("Could not determine unique identifier for %s" % issue) -def find_taskwarrior_uuid(tw, keys, issue): +def find_taskwarrior_uuid( + tw: TaskWarriorShellout, keys: dict[str, list[str]], issue: dict[str, Any] +) -> str: """For a given issue issue, find its local taskwarrior UUID. Assembles a list of task IDs existing in taskwarrior @@ -151,7 +155,12 @@ def find_taskwarrior_uuid(tw, keys, issue): raise NotFound("No issue was found matching %s" % issue) -def replace_left(field, local_task, remote_issue, keep_items=[]): +def replace_left( + field: str, + local_task: dict[str, Any], + remote_issue: dict[str, Any], + keep_items: list[str] = [], +) -> None: """Replace array field from the remote_issue to the local_task * Local 'left' entries are suppressed, unless those listed in keep_items. @@ -188,7 +197,12 @@ def replace_left(field, local_task, remote_issue, keep_items=[]): local_task[field] += remote_field -def merge_left(field, local_task, remote_issue, hamming=False): +def merge_left( + field: str, + local_task: dict[str, Any], + remote_issue: dict[str, Any], + hamming: bool = False, +) -> None: """Merge array field from the remote_issue into local_task * Local 'left' entries are preserved without modification @@ -236,7 +250,7 @@ def merge_left(field, local_task, remote_issue, hamming=False): ) -def run_hooks(pre_import): +def run_hooks(pre_import: list[str]) -> None: for hook in pre_import: exit_code = subprocess.call(hook, shell=True) if exit_code != 0: @@ -245,7 +259,11 @@ def run_hooks(pre_import): raise RuntimeError(msg) -def synchronize(issue_generator, conf: "Config", dry_run: bool = False): +def synchronize( + issue_generator: Iterable[dict | tuple[str, str]], + conf: "Config", + dry_run: bool = False, +) -> None: services = [service_config.service for service_config in conf.service_configs] key_list = build_key_list(services) uda_list = build_uda_config_overrides(services) @@ -274,7 +292,10 @@ def synchronize(issue_generator, conf: "Config", dry_run: bool = False): } for issue in issue_generator: - if isinstance(issue, tuple) and issue[0] == 'SERVICE FAILED': + if isinstance(issue, tuple): + assert issue[0] == 'SERVICE FAILED', ( + "'issue' should only be a tuple in case of a failure" + ) successful_config_map.pop(issue[1]) continue @@ -406,7 +427,7 @@ def synchronize(issue_generator, conf: "Config", dry_run: bool = False): service_config.service for service_config in successful_config_map.values() ), ) - issue_updates['closed'] = succeeded_service_task_uuids - seen_uuids + issue_updates['closed'] = list(succeeded_service_task_uuids - seen_uuids) log.info("Closing %i tasks", len(issue_updates['closed'])) for issue in issue_updates['closed']: _, task_info = tw.get_task(uuid=issue) @@ -449,20 +470,20 @@ def synchronize(issue_generator, conf: "Config", dry_run: bool = False): ) -def build_key_list(services: Iterable[str]): +def build_key_list(services: Iterable[str]) -> dict[str, list[str]]: return { service: get_service(service).ISSUE_CLASS.UNIQUE_KEY for service in services } -def get_defined_udas_as_strings(conf: "Config"): +def get_defined_udas_as_strings(conf: "Config") -> Iterator[str]: uda_list = build_uda_config_overrides( service_config.service for service_config in conf.service_configs ) yield from convert_override_args_to_taskrc_settings(uda_list) -def build_uda_config_overrides(services: Iterable[str]): +def build_uda_config_overrides(services: Iterable[str]) -> dict[str, Any]: """Returns a list of UDAs defined by given targets For all targets in `targets`, build a dictionary of configuration overrides @@ -498,7 +519,9 @@ def build_uda_config_overrides(services: Iterable[str]): return {'uda': targets_udas} -def convert_override_args_to_taskrc_settings(config, prefix=''): +def convert_override_args_to_taskrc_settings( + config: dict[str, Any], prefix: str = '' +) -> list[str]: args = [] for k, v in config.items(): if isinstance(v, dict): diff --git a/bugwarrior/docs/conf.py b/bugwarrior/docs/conf.py index c7bdd29b..e4c60c61 100644 --- a/bugwarrior/docs/conf.py +++ b/bugwarrior/docs/conf.py @@ -321,4 +321,7 @@ } # Suppress warnings for types that can't be resolved via intersphinx -nitpick_ignore = [('py:class', 'ConfigDict')] +nitpick_ignore = [ + ('py:class', 'ConfigDict'), + ('py:class', 'bugwarrior.services.T_Issue'), +] diff --git a/bugwarrior/notifications.py b/bugwarrior/notifications.py index 39560849..b426cff1 100644 --- a/bugwarrior/notifications.py +++ b/bugwarrior/notifications.py @@ -1,15 +1,19 @@ import os import subprocess +from typing import TYPE_CHECKING, Any import warnings import requests +if TYPE_CHECKING: + from bugwarrior.config.schema import Notifications + cache_dir = os.path.expanduser(os.getenv('XDG_CACHE_HOME', "~/.cache") + "/bugwarrior") logo_path = cache_dir + "/logo.png" logo_url = "https://upload.wikimedia.org/wikipedia/en/5/59/Taskwarrior_logo.png" -def _cache_logo(): +def _cache_logo() -> None: if os.path.exists(logo_path): return @@ -21,7 +25,7 @@ def _cache_logo(): f.write(response.content) -def _get_metadata(issue): +def _get_metadata(issue: dict[str, Any]) -> str: due = '' tags = '' priority = '' @@ -47,7 +51,7 @@ def _get_metadata(issue): return metadata -def send_notification(issue, op, conf): +def send_notification(issue: dict[str, Any], op: str, conf: "Notifications") -> None: notify_backend = conf.backend if notify_backend == 'pynotify': diff --git a/bugwarrior/services/__init__.py b/bugwarrior/services/__init__.py index 908885a8..df1b9f9a 100644 --- a/bugwarrior/services/__init__.py +++ b/bugwarrior/services/__init__.py @@ -4,13 +4,13 @@ """ import abc -from collections.abc import Iterable +from collections.abc import Iterable, Iterator import datetime import logging import math import os import re -import typing +from typing import Any, Generic, Optional, TypeVar import zoneinfo from dateutil.parser import parse as parse_date @@ -43,20 +43,20 @@ class URLShortener: _instance = None - def __new__(cls, *args, **kwargs): + def __new__(cls, *args: Any, **kwargs: Any) -> "URLShortener": if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance @CACHE_REGION.cache_on_arguments() - def shorten(self, url): + def shorten(self, url: str) -> str: if not url: return '' base = 'https://da.gd/s' return requests.get(base, params=dict(url=url)).text.strip() -def get_processed_url(main_config: schema.MainSectionConfig, url: str): +def get_processed_url(main_config: schema.MainSectionConfig, url: str) -> str: """Returns a URL with conditional processing. If the following config key are set: @@ -105,23 +105,23 @@ class Issue(abc.ABC): def __init__( self, - foreign_record: dict, + foreign_record: dict[str, Any], config: schema.ServiceConfig, main_config: schema.MainSectionConfig, - extra: dict, - ): + extra: dict[str, Any], + ) -> None: #: Data retrieved from the external service. - self.record: dict = foreign_record + self.record = foreign_record #: An object whose attributes are this service's configuration values. self.config: schema.ServiceConfig = config #: An object whose attributes are the #: :ref:`common_configuration:Main Section` configuration values. self.main_config: schema.MainSectionConfig = main_config #: Data computed by the :class:`Service` class. - self.extra: dict = extra + self.extra = extra @abc.abstractmethod - def to_taskwarrior(self) -> dict: + def to_taskwarrior(self) -> dict[str, Any]: """Transform a foreign record into a taskwarrior dictionary.""" raise NotImplementedError() @@ -136,10 +136,10 @@ def get_default_description(self) -> str: def get_tags_from_labels( self, - labels: list, - toggle_option='import_labels_as_tags', - template_option='label_template', - template_variable='label', + labels: list[str], + toggle_option: str = 'import_labels_as_tags', + template_option: str = 'label_template', + template_variable: str = 'label', ) -> list[str]: """Transform labels into suitable taskwarrior tags, respecting configuration options. @@ -166,14 +166,14 @@ def get_tags_from_labels( return tags - def get_priority(self) -> typing.Literal['', 'L', 'M', 'H']: + def get_priority(self) -> schema.Priority: """Return the priority of this issue, falling back to ``default_priority`` configuration.""" return self.PRIORITY_MAP.get( self.record.get('priority'), self.config.default_priority ) def parse_date( - self, date: str | None, timezone='deprecated' + self, date: str | None, timezone: str = 'deprecated' ) -> datetime.datetime | None: """Parse a date string into a datetime object. @@ -201,7 +201,7 @@ def parse_date( return _date.replace(microsecond=0) def build_default_description( - self, title='', url='', number='', cls="issue" + self, title: str = '', url: str = '', number: str | int = '', cls: str = "issue" ) -> str: """Return a default description, respecting configuration options. @@ -236,7 +236,10 @@ def build_default_description( ) -class Service(abc.ABC): +T_Issue = TypeVar("T_Issue", bound="Issue") + + +class Service(abc.ABC, Generic[T_Issue]): """Base class for fetching issues from the service. The upper case attributes and abstract methods need to be defined by @@ -247,13 +250,13 @@ class Service(abc.ABC): #: Which version of the API does this service implement? API_VERSION: float #: Which class should this service instantiate for holding these issues? - ISSUE_CLASS: type[Issue] + ISSUE_CLASS: type[T_Issue] #: Which class defines this service's configuration options? CONFIG_SCHEMA: type[schema.ServiceConfig] def __init__( self, config: schema.ServiceConfig, main_config: schema.MainSectionConfig - ): + ) -> None: over_version = math.floor(LATEST_API_VERSION) + 1 if self.API_VERSION >= over_version: raise ValueError( @@ -270,7 +273,7 @@ def __init__( log.info("Working on [%s]", self.config.target) - def get_secret(self, key, login='nousername') -> str: + def get_secret(self, key: str, login: str = 'nousername') -> str: """Get a secret value, potentially from an :ref:`oracle `. The secret key need not be a *password*, per se. @@ -290,7 +293,9 @@ def get_secret(self, key, login='nousername') -> str: ) return password - def get_issue_for_record(self, record, extra=None) -> Issue: + def get_issue_for_record( + self, record: dict[str, Any], extra: dict[str, Any] | None = None + ) -> T_Issue: """Instantiate and return an issue for the given record. :param `record`: Foreign record. @@ -300,8 +305,8 @@ def get_issue_for_record(self, record, extra=None) -> Issue: return self.ISSUE_CLASS(record, self.config, self.main_config, extra=extra) def build_annotations( - self, annotations: Iterable, url: typing.Optional[str] = None - ) -> list: + self, annotations: Iterable[tuple[str, str]], url: Optional[str] = None + ) -> list[str]: """Format annotations, respecting configuration values. :param `annotations`: Comments from service. @@ -329,7 +334,7 @@ def build_annotations( return final @abc.abstractmethod - def issues(self): + def issues(self) -> Iterator[T_Issue]: """A generator yielding Issue instances representing issues from a remote service. Each item in the list should be a dict that looks something like this: @@ -373,20 +378,14 @@ class Client: """ @staticmethod - def json_response(response: requests.Response): + def json_response(response: requests.Response) -> Any: """Return json if response is OK.""" # If we didn't get good results, just bail. if response.status_code != 200: raise OSError( - "Non-200 status code %r; %r; %r" - % (response.status_code, response.url, response.text) + f"Non-200 status code {response.status_code}; {response.url}; {response.text}" ) - if callable(response.json): - # Newer python-requests - return response.json() - else: - # Older python-requests - return response.json + return response.json() # NOTE: __all__ determines the stable, public API. diff --git a/bugwarrior/services/azuredevops.py b/bugwarrior/services/azuredevops.py index 6b3e9e8a..3fb8b47c 100644 --- a/bugwarrior/services/azuredevops.py +++ b/bugwarrior/services/azuredevops.py @@ -2,7 +2,7 @@ import logging import re import sys -import typing +from typing import Annotated, Any, Iterator, Literal from urllib.parse import quote from pydantic import BeforeValidator @@ -13,11 +13,11 @@ log = logging.getLogger(__name__) -EscapedStr = typing.Annotated[str, BeforeValidator(quote)] +EscapedStr = Annotated[str, BeforeValidator(quote)] class AzureDevopsConfig(config.ServiceConfig): - service: typing.Literal['azuredevops'] + service: Literal['azuredevops'] PAT: str project: EscapedStr organization: EscapedStr @@ -26,12 +26,12 @@ class AzureDevopsConfig(config.ServiceConfig): wiql_filter: str = '' -def striphtml(data): +def striphtml(data: str) -> str: p = re.compile(r"<.*?>") return p.sub("", data) -def format_item(item): +def format_item(item: str | None) -> str | None: """Removes HTML Elements, splits by line""" if item: item_lines = re.split(r"
|| ", item) @@ -41,7 +41,7 @@ def format_item(item): class AzureDevopsClient(Client): - def __init__(self, pat, org, project, host): + def __init__(self, pat: str, org: str, project: str, host: str) -> None: if pat[0] != ":": pat = f":{pat}" self.pat = base64.b64encode(pat.encode("ascii")).decode("ascii") @@ -57,7 +57,7 @@ def __init__(self, pat, org, project, host): } self.params = {"api-version": "6.0-preview.2"} - def get_work_item(self, workitemid): + def get_work_item(self, workitemid: str | int) -> dict[str, Any]: queryset = self.params.copy() queryset.update({"$expand": "all"}) resp = self.session.get( @@ -65,7 +65,7 @@ def get_work_item(self, workitemid): ) return resp.json() - def get_work_items_from_query(self, query): + def get_work_items_from_query(self, query: str) -> list[int]: data = str({"query": query}) resp = self.session.post(f"{self.base_url}/wiql", data=data, params=self.params) if resp.status_code == 401: @@ -85,13 +85,16 @@ def get_work_items_from_query(self, query): sys.exit(1) return [workitem['id'] for workitem in resp.json()["workItems"]] - def get_workitem_comments(self, workitem): + def get_workitem_comments( + self, workitem: dict[str, Any] + ) -> list[dict[str, Any]] | None: comment_link = workitem["_links"]["workItemComments"]["href"] resp = self.session.get(comment_link) return resp.json().get("comments", None) - def get_parent_name(self, workitem): - parent_id = workitem.get("fields").get("System.Parent", None) + def get_parent_name(self, workitem: dict[str, Any]) -> str | None: + parent_id = workitem.get("fields", {}).get("System.Parent", None) + if parent_id: parent_item = self.get_work_item(parent_id) return parent_item["fields"]["System.Title"] @@ -130,15 +133,15 @@ class AzureDevopsIssue(Issue): } UNIQUE_KEY = (URL,) - PRIORITY_MAP = {"1": "H", "2": "M", "3": "L", "4": "L"} + PRIORITY_MAP: dict[str, config.Priority] = {"1": "H", "2": "M", "3": "L", "4": "L"} - def get_priority(self): + def get_priority(self) -> config.Priority: value = self.record["fields"].get( "Microsoft.VSTS.Common.Priority", self.config.default_priority ) return self.PRIORITY_MAP.get(value, self.config.default_priority) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { "project": self.extra['project'], "priority": self.get_priority(), @@ -168,7 +171,7 @@ def to_taskwarrior(self): self.NAMESPACE: self.extra.get("namespace"), } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record["fields"]["System.Title"], url=self.record["_links"]["html"]["href"], @@ -177,12 +180,12 @@ def get_default_description(self): ) -class AzureDevopsService(Service): +class AzureDevopsService(Service[AzureDevopsIssue]): API_VERSION = 1.0 ISSUE_CLASS = AzureDevopsIssue CONFIG_SCHEMA = AzureDevopsConfig - def __init__(self, *args, **kw): + def __init__(self, *args: Any, **kw: Any) -> None: super().__init__(*args, **kw) self.client = AzureDevopsClient( pat=self.get_secret('PAT'), @@ -191,7 +194,7 @@ def __init__(self, *args, **kw): host=self.config.host, ) - def get_query(self): + def get_query(self) -> list[int]: default_query = "SELECT [System.Id] FROM workitems" # Test for Clauses, add WHERE if any exist @@ -222,7 +225,7 @@ def get_query(self): list_of_items = self.client.get_work_items_from_query(default_query) return list_of_items - def annotations(self, issue): + def annotations(self, issue: dict[str, Any]) -> list[str]: # Build Annotations based on comments by commenter and comment text url = issue["_links"]["html"]["href"] annotations = [] @@ -238,7 +241,7 @@ def annotations(self, issue): annotations.append((name, text)) return self.build_annotations(annotations, url) - def issues(self): + def issues(self) -> Iterator[AzureDevopsIssue]: issue_ids = self.get_query() for issue_id in issue_ids: issue = self.client.get_work_item(issue_id) @@ -254,5 +257,5 @@ def issues(self): yield issue_obj @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: AzureDevopsConfig) -> str: return f"azuredevops://{config.organization}@{config.host}" diff --git a/bugwarrior/services/bitbucket.py b/bugwarrior/services/bitbucket.py index 8e33bee2..e82fb562 100644 --- a/bugwarrior/services/bitbucket.py +++ b/bugwarrior/services/bitbucket.py @@ -1,5 +1,5 @@ import logging -import typing +from typing import Any, Iterator, Literal, Union from pydantic import model_validator import requests @@ -12,9 +12,9 @@ class BitbucketConfig(config.ServiceConfig): _DEPRECATE_FILTER_MERGE_REQUESTS = True - filter_merge_requests: typing.Union[bool, typing.Literal['Undefined']] = 'Undefined' + filter_merge_requests: Union[bool, Literal['Undefined']] = 'Undefined' - service: typing.Literal['bitbucket'] + service: Literal['bitbucket'] username: str @@ -26,13 +26,11 @@ class BitbucketConfig(config.ServiceConfig): include_repos: config.ConfigList = [] exclude_repos: config.ConfigList = [] - include_merge_requests: typing.Union[bool, typing.Literal['Undefined']] = ( - 'Undefined' - ) + include_merge_requests: Union[bool, Literal['Undefined']] = 'Undefined' project_owner_prefix: bool = False @model_validator(mode='after') - def deprecate_password_authentication(self): + def deprecate_password_authentication(self) -> "BitbucketConfig": if self.login != 'Undefined' or self.password != 'Undefined': log.warning( 'Bitbucket has disabled password authentication and, as such, ' @@ -62,7 +60,7 @@ class BitbucketIssue(Issue): 'blocker': 'H', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.extra['project'], 'priority': self.get_priority(), @@ -72,7 +70,7 @@ def to_taskwarrior(self): self.TITLE: self.record['title'], } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['title'], url=self.extra['url'], @@ -81,7 +79,7 @@ def get_default_description(self): ) -class BitbucketService(Service, Client): +class BitbucketService(Service[BitbucketIssue], Client): API_VERSION = 1.0 ISSUE_CLASS = BitbucketIssue CONFIG_SCHEMA = BitbucketConfig @@ -89,7 +87,7 @@ class BitbucketService(Service, Client): BASE_API2 = 'https://api.bitbucket.org/2.0' BASE_URL = 'https://bitbucket.org/' - def __init__(self, *args, **kw): + def __init__(self, *args: Any, **kw: Any) -> None: super().__init__(*args, **kw) oauth = (self.config.key, self.get_secret('secret', self.config.key)) @@ -117,10 +115,10 @@ def __init__(self, *args, **kw): } @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: BitbucketConfig) -> str: return f"bitbucket://{config.key}/{config.username}" - def filter_repos(self, repo_tag): + def filter_repos(self, repo_tag: str) -> bool: repo = repo_tag.split('/').pop() if self.config.exclude_repos: @@ -135,29 +133,31 @@ def filter_repos(self, repo_tag): return True - def get_data(self, url): + def get_data(self, url: str) -> dict[str, Any]: """Perform a request to the fully qualified url and return json.""" return self.json_response(requests.get(url, **self.requests_kwargs)) - def get_collection(self, url): + def get_collection(self, url: str) -> Iterator[Any]: """Pages through an object collection from the bitbucket API. Returns an iterator that lazily goes through all the 'values' of all the pages in the collection.""" - url = self.BASE_API2 + url - while url is not None: - response = self.get_data(url) + next_url: str | None = self.BASE_API2 + url + while next_url is not None: + response = self.get_data(next_url) yield from response['values'] - url = response.get('next', None) + next_url = response.get('next', None) - def fetch_issues(self, tag): + def fetch_issues(self, tag: str) -> list[tuple[str, dict[str, Any]]]: response = self.get_collection('/repositories/%s/issues/' % (tag)) return [(tag, issue) for issue in response] - def fetch_pull_requests(self, tag): + def fetch_pull_requests(self, tag: str) -> list[tuple[str, dict[str, Any]]]: response = self.get_collection('/repositories/%s/pullrequests/' % tag) return [(tag, issue) for issue in response] - def get_annotations(self, tag, issue, issue_obj, url): + def get_annotations( + self, tag: str, issue: dict[str, Any], issue_obj: Issue, url: str + ) -> list[str]: response = self.get_collection( '/repositories/%s/pullrequests/%i/comments' % (tag, issue['id']) ) @@ -169,13 +169,14 @@ def get_annotations(self, tag, issue, issue_obj, url): url, ) - def get_owner(self, issue): - _, issue = issue - assignee = issue.get('assignee', None) + def get_owner(self, issue: tuple[str, dict[str, Any]]) -> str | None: + _, issue_dict = issue + assignee = issue_dict.get('assignee', None) if assignee is not None: return assignee.get('username', None) + return None - def include(self, issue): + def include(self, issue: tuple[str, dict[str, Any]]) -> bool: """Return true if the issue in question should be included""" if self.config.only_if_assigned: owner = self.get_owner(issue) @@ -188,7 +189,7 @@ def include(self, issue): return True - def issues(self): + def issues(self) -> Iterator[BitbucketIssue]: user = self.config.username response = self.get_collection('/repositories/' + user + '/') repo_tags = list( @@ -232,7 +233,7 @@ def issues(self): closed = ['rejected', 'fulfilled'] - def not_resolved(tup): + def not_resolved(tup: tuple[str, dict[str, Any]]) -> bool: return tup[1]['state'] not in closed pull_requests = list(filter(not_resolved, pull_requests)) diff --git a/bugwarrior/services/linear.py b/bugwarrior/services/linear.py index 65590c97..7139232b 100644 --- a/bugwarrior/services/linear.py +++ b/bugwarrior/services/linear.py @@ -114,9 +114,9 @@ def get_tags(self): def get_default_description(self): return self.build_default_description( - title=self.record.get("title"), - url=self.record.get("url"), - number=self.record.get("identifier"), + title=self.record.get("title", ""), + url=self.record.get("url", ""), + number=self.record.get("identifier", ""), cls="task", ) diff --git a/bugwarrior/services/pivotaltracker.py b/bugwarrior/services/pivotaltracker.py index 8002ec34..ebd552fa 100644 --- a/bugwarrior/services/pivotaltracker.py +++ b/bugwarrior/services/pivotaltracker.py @@ -100,10 +100,10 @@ def get_tags(self): def get_default_description(self): return self.build_default_description( - title=self.record.get('name'), - url=self.record.get('url'), + title=self.record.get('name', ''), + url=self.record.get('url', ''), number=int(self.record['id']), - cls=self.record.get('story_type'), + cls=self.record.get('story_type', 'issue'), ) diff --git a/pyproject.toml b/pyproject.toml index eb7ba34b..b114098e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "lockfile>=0.9.1", "pydantic[email]>=2", "python-dateutil", - "requests", + "requests>=1", # Needed at runtime: phabricator and ini2toml import pkg_resources, # which is no longer bundled in venvs since Python 3.12 (see #1150). "setuptools", @@ -88,7 +88,19 @@ quote-style = "preserve" skip-magic-trailing-comma = true [tool.ruff.lint] -select = ["E4", "E7", "E9", "F", "W", "I", "E501"] +select = ["E4", "E7", "E9", "F", "W", "I", "E501", "ANN"] +ignore = ["ANN401"] +# ANN rules (flake8-annotations) detect untyped code — useful for gradual typing. +# Currently enforced in: bugwarrior/config/, command.py, db.py, collect.py, notifications.py +# ANN401 (disallow Any) is too strict for wrappers, *args/**kwargs, and data storage. + +[tool.ruff.lint.per-file-ignores] +"tests/**" = ["ANN"] +"bugwarrior/docs/**" = ["ANN"] +"bugwarrior/services/{bts,bz,clickup,deck,gerrit,gitbug,github,gitlab,gmail,jira,kanboard,linear,logseq,pagure,phab,pivotaltracker,redmine,taiga,teamwork_projects,todoist,trac,trello,youtrack}.py" = ["ANN"] + +[tool.ruff.lint.flake8-annotations] +suppress-dummy-args = true [tool.ruff.lint.isort] combine-as-imports = true