Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions bugwarrior/collect.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from collections.abc import Iterator
import copy
from functools import cache
from importlib.metadata import entry_points
import logging
import multiprocessing
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from jinja2 import Template
from taskw.task import Task

if TYPE_CHECKING:
from bugwarrior.config.validation import Config
from bugwarrior.services import Service
from bugwarrior.services import Issue, Service

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,7 +48,7 @@ def get_service_instances(conf: "Config") -> list["Service"]:
]


def _aggregate_issues(service: "Service", queue: multiprocessing.Queue):
def _aggregate_issues(service: "Service", queue: multiprocessing.Queue) -> None:
"""This worker function is separated out from the main
:func:`aggregate_issues` func only so that we can use multiprocessing
on it for speed reasons.
Expand Down Expand Up @@ -80,7 +81,7 @@ def _aggregate_issues(service: "Service", queue: multiprocessing.Queue):
log.info(f"Done with [{target}] in {duration}.")


def aggregate_issues(conf: "Config", debug: bool):
def aggregate_issues(conf: "Config", debug: bool) -> Iterator[dict | tuple[str, str]]:
"""Return all issues from every target."""
log.info("Starting to aggregate remote issues.")

Expand Down Expand Up @@ -129,10 +130,10 @@ def aggregate_issues(conf: "Config", debug: bool):
class TaskConstructor:
"""Construct a taskwarrior task from a foreign record."""

def __init__(self, issue):
def __init__(self, issue: "Issue") -> None:
self.issue = issue

def get_added_tags(self):
def get_added_tags(self) -> list[str]:
added_tags = []
for tag in self.issue.config.add_tags:
tag = Template(tag).render(self.get_template_context())
Expand All @@ -141,7 +142,7 @@ def get_added_tags(self):

return added_tags

def get_taskwarrior_record(self, refined=True) -> dict:
def get_taskwarrior_record(self, refined: bool = True) -> dict[str, Any]:
if not getattr(self, '_taskwarrior_record', None):
self._taskwarrior_record = self.issue.to_taskwarrior()
record = copy.deepcopy(self._taskwarrior_record)
Expand All @@ -153,13 +154,13 @@ def get_taskwarrior_record(self, refined=True) -> dict:
record['tags'].extend(self.get_added_tags())
return record

def get_template_context(self):
def get_template_context(self) -> dict[str, Any]:
context = self.get_taskwarrior_record(refined=False).copy()
context.update(self.issue.extra)
context.update({'description': self.issue.get_default_description()})
return context

def refine_record(self, record):
def refine_record(self, record: dict[str, Any]) -> dict[str, Any]:
for field in Task.FIELDS.keys():
if field in self.issue.config.templates:
template = Template(self.issue.config.templates[field])
Expand Down
43 changes: 26 additions & 17 deletions bugwarrior/command.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from collections.abc import Callable, Iterator
import functools
import getpass
import logging
import os
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import click
from lockfile import LockTimeout
Expand All @@ -22,7 +23,7 @@
lst = list


def _get_section_name(flavor):
def _get_section_name(flavor: str | None) -> str:
if flavor:
return 'flavor.' + flavor
return 'general'
Expand All @@ -46,10 +47,14 @@ def _try_load_config(
sys.exit(1)


def _legacy_cli_deprecation_warning(subcommand_callback):
def _legacy_cli_deprecation_warning(
subcommand_callback: Callable[..., Any],
) -> Callable[..., Any]:
@functools.wraps(subcommand_callback)
@click.pass_context
def wrapped_subcommand_callback(ctx, *args, **kwargs):
def wrapped_subcommand_callback(
ctx: click.Context, *args: Any, **kwargs: Any
) -> Any:
if ctx.find_root().command_path != 'bugwarrior':
old_command = ctx.command_path
new_command = ctx.command_path.replace('-', ' ')
Expand All @@ -71,16 +76,18 @@ class AliasedCli(click.Group):
with the old cli api.
"""

def list_commands(self, ctx):
return ctx.command.commands.keys()
def list_commands(self, ctx: click.Context) -> list[str]:
assert isinstance(ctx.command, click.Group)
return list(ctx.command.commands)

def get_command(self, ctx, cmd_name):
return ctx.command.commands[cmd_name]
def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
assert isinstance(ctx.command, click.Group)
return ctx.command.commands.get(cmd_name)


@click.command(cls=AliasedCli)
@click.version_option()
def cli():
def cli() -> None:
pass


Expand All @@ -93,7 +100,9 @@ def cli():
)
@click.option('--quiet', is_flag=True, help='Set logging level to WARNING.')
@_legacy_cli_deprecation_warning
def pull(dry_run, flavor, interactive, debug, quiet):
def pull(
dry_run: bool, flavor: str | None, interactive: bool, debug: bool, quiet: bool
) -> None:
"""Pull down tasks from forges and add them to your taskwarrior tasks.

Relies on configuration file.
Expand Down Expand Up @@ -128,7 +137,7 @@ def pull(dry_run, flavor, interactive, debug, quiet):

@cli.group()
@_legacy_cli_deprecation_warning
def vault():
def vault() -> None:
"""Password/keyring management for bugwarrior.

If you use the keyring password oracle in your bugwarrior config, this tool
Expand All @@ -138,7 +147,7 @@ def vault():
pass


def targets():
def targets() -> Iterator[str]:
config = _try_load_config('general')
for service_config in config.service_configs:
for value in dict(service_config).values():
Expand All @@ -149,7 +158,7 @@ def targets():


@vault.command()
def list():
def list() -> None:
pws = lst(targets())
print("%i @oracle:use_keyring passwords in bugwarriorrc" % len(pws))
for section in pws:
Expand All @@ -159,7 +168,7 @@ def list():
@vault.command()
@click.argument('target')
@click.argument('username')
def clear(target, username):
def clear(target: str, username: str) -> None:
target_list = lst(targets())
if target not in target_list:
raise ValueError("%s must be one of %r" % (target, target_list))
Expand All @@ -175,7 +184,7 @@ def clear(target, username):
@vault.command()
@click.argument('target')
@click.argument('username')
def set(target, username):
def set(target: str, username: str) -> None:
target_list = lst(targets())
if target not in target_list:
log.warning(
Expand All @@ -192,7 +201,7 @@ def set(target, username):
@cli.command()
@click.option('--flavor', default=None, help='The flavor to use')
@_legacy_cli_deprecation_warning
def uda(flavor):
def uda(flavor: str | None) -> None:
"""
List bugwarrior-managed uda's.

Expand Down Expand Up @@ -226,7 +235,7 @@ def uda(flavor):
@click.argument(
'rcfile', required=False, default=get_config_path(), type=click.Path(exists=True)
)
def ini2toml(rcfile):
def ini2toml(rcfile: str) -> None:
"""Convert ini bugwarriorrc to toml and print result to stdout."""
try:
from ini2toml.api import Translator
Expand Down
1 change: 1 addition & 0 deletions bugwarrior/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ExpandedPath, # noqa: F401
MainSectionConfig,
NoSchemeUrl, # noqa: F401
Priority, # noqa: F401
ServiceConfig,
StrippedTrailingSlashUrl, # noqa: F401
TaskrcPath, # noqa: F401
Expand Down
17 changes: 10 additions & 7 deletions bugwarrior/config/data.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import json
import os
from pathlib import Path
import subprocess
import typing

from lockfile.pidlockfile import PIDLockFile


def get_data_path(taskrc):
def get_data_path(taskrc: str | Path) -> str:
# We cannot use the taskw module here because it doesn't really support
# the `_` subcommands properly (`rc:` can't be used for them).
line_prefix = 'data.location='

# Take a copy of the environment and add our taskrc to it.
env = dict(os.environ)
env['TASKRC'] = taskrc
env['TASKRC'] = str(taskrc)

tw_show = subprocess.Popen(('task', '_show'), stdout=subprocess.PIPE, env=env)
data_location = subprocess.check_output(
Expand All @@ -36,31 +37,33 @@ class BugwarriorData:
key-value store.
"""

def __init__(self, data_path):
def __init__(self, data_path: str) -> None:
self._datafile = os.path.join(data_path, 'bugwarrior.data')
self._lockfile = os.path.join(data_path, 'bugwarrior-data.lockfile')
#: Taskwarrior's ``data.location`` configuration value. If necessary,
#: services can manage their own files here.
self.path = data_path

@classmethod
def __get_pydantic_json_schema__(cls, core_schema, handler):
def __get_pydantic_json_schema__(
cls, core_schema: typing.Any, handler: typing.Any
) -> dict[str, str]:
"""Fix schema generation in pydantic v2."""
return {"type": "object", "description": "Local data storage"}

def get_data(self) -> dict:
def get_data(self) -> dict[str, typing.Any]:
"""Return all data from the ``bugwarrior.data`` file."""
with open(self._datafile) as jsondata:
return json.load(jsondata)

def get(self, key) -> typing.Any:
def get(self, key: str) -> typing.Any:
"""Return a value stored in the ``bugwarrior.data`` file."""
try:
return self.get_data()[key]
except OSError: # File does not exist.
return None

def set(self, key, value):
def set(self, key: str, value: typing.Any) -> None:
"""Set a value in the ``bugwarrior.data`` file."""
with PIDLockFile(self._lockfile):
try:
Expand Down
14 changes: 8 additions & 6 deletions bugwarrior/config/ini2toml_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
log = logging.getLogger(__name__)


def to_type(section: IntermediateRepr, key: str, converter: typing.Callable):
def to_type(section: IntermediateRepr, key: str, converter: typing.Callable) -> None:
try:
val = section[key]
except KeyError:
Expand All @@ -27,15 +27,15 @@ def to_type(section: IntermediateRepr, key: str, converter: typing.Callable):
section[key] = converter(val)


def to_bool(section: IntermediateRepr, key: str):
def to_bool(section: IntermediateRepr, key: str) -> None:
to_type(section, key, pydantic.TypeAdapter(bool).validate_python)


def to_int(section: IntermediateRepr, key: str):
def to_int(section: IntermediateRepr, key: str) -> None:
to_type(section, key, int)


def to_list(section: IntermediateRepr, key: str):
def to_list(section: IntermediateRepr, key: str) -> None:
to_type(section, key, parse_config_list)


Expand All @@ -53,7 +53,9 @@ def get_field_type(attrs: dict) -> typing.Optional[str]:
return None


def convert_section(section: IntermediateRepr, schema: type[pydantic.BaseModel]):
def convert_section(
section: IntermediateRepr, schema: type[pydantic.BaseModel]
) -> None:
for prop, attrs in schema.model_json_schema()['properties'].items():
field_type = get_field_type(attrs)
if field_type == 'boolean':
Expand Down Expand Up @@ -129,7 +131,7 @@ def unquote_flavors(file_contents: str) -> str:
)


def activate(translator: Translator):
def activate(translator: Translator) -> None:
profile = translator["bugwarriorrc"]
profile.help_text = "Convert 'bugwarriorrc' files to 'bugwarrior.toml'"
profile.intermediate_processors.append(process_values)
Expand Down
Loading
Loading