diff --git a/CMakeLists.txt b/CMakeLists.txt index a0eca42..ebbe3c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.0.2) +cmake_minimum_required(VERSION 3.8.0) project(pgp-test) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules/") @@ -40,3 +40,5 @@ if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 8.2) set_property(TARGET generate_derived_key PROPERTY CXX_STANDARD 20) add_compile_options(-fconcepts -DHAVE_CPP20_CONCEPTS) endif() + +add_subdirectory(tests) diff --git a/README.md b/README.md index 06f82dc..6850759 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,16 @@ -PGP key generation -================== +# PGP key generation This repository provides the source for a utility used for creating PGP keys using libsodium. The keys generated can be deterministic. -DEPENDENCIES -============ +## Dependencies -This repository depends only on the pgp-packet-library - and the -dependencies it has. +The source code can be built using only the dependencies of the +pgp-packet-library. The integration testing script, which can be run using +`make test` in the build folder, additionally requires Python 3.7 (or 3.6 with +the `dataclasses` library) and GnuPG to be installed. -GENERATING NEW KEYS -=============== +## Generating new keys - If you have a new smartcard, change the user and admin pin first. See: https://www.gnupg.org/howtos/card-howto/en/ch03s02.html @@ -54,7 +53,6 @@ You should now have a functional key. You can test it as follows: - gpg -r [key id] --encrypt test.txt - gpg --decrypt test.txt.gpg -UPDATING EXISTING KEYS -=============== +## Updating existing keys If you want to change the expiry date of existing keys, you can simply follow the steps above again to generate a new key with a different expiry date, using your encrypted seed and passphrase. diff --git a/generate_derived_key.cpp b/generate_derived_key.cpp index ad2d9aa..cd4f400 100644 --- a/generate_derived_key.cpp +++ b/generate_derived_key.cpp @@ -242,11 +242,12 @@ namespace { Options options; // description of the options for the boost option parser - po::options_description optdesc; + // Set the line length to 100 for slightly wider option descriptions + po::options_description optdesc(100); optdesc.add_options() ("help,h", "Produce help message") ("output-file,o", po::value>(&options.output_file), "Output file") - ("key-type,t", po::value> (&options.type), "Type of the generated key (eddsa/ecdsa)") + ("key-type,t", po::value> (&options.type), "Type of the generated key (eddsa/ecdsa/rsa{2048,4096,8192})") ("name,n", po::value>(&options.user_name), "Your name (firstname lastname)") ("email,e", po::value>(&options.user_email), "Your email address") ("sigtime,s", po::value> (&options.signature_creation), "Signature creation time in UTC (YYYY-MM-DD HH:MM:SS)") @@ -286,7 +287,7 @@ namespace { // ensure that all the options are initialized by possibly reading some from standard input options.output_file .ensure_prompt("Output file"); - options.type .ensure_prompt("Type of the generated key (eddsa/ecdsa)"); + options.type .ensure_prompt("Type of the generated key (eddsa/ecdsa/rsa{2048,4096,8192})"); options.user_name .ensure_prompt("Your name (firstname lastname)"); options.user_email .ensure_prompt("Your email address"); options.signature_creation .ensure_prompt("Signature creation time in UTC (YYYY-MM-DD HH:MM:SS)"); diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt new file mode 100644 index 0000000..6f9393e --- /dev/null +++ b/tests/CMakeLists.txt @@ -0,0 +1,5 @@ +add_custom_target(integration-test + COMMAND ${CMAKE_CURRENT_LIST_DIR}/integration_test.py ${CMAKE_BINARY_DIR}/generate_derived_key) + +add_custom_target(test + DEPENDS generate_derived_key integration-test) diff --git a/tests/date_utils.py b/tests/date_utils.py new file mode 100644 index 0000000..75f325b --- /dev/null +++ b/tests/date_utils.py @@ -0,0 +1,22 @@ +import datetime + + +def is_leap_year(year): + return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) + +def days_in_month(year, month): + if month == 2: + if is_leap_year(year): return 29 + else: return 28 + return [31, None, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][month - 1] + +# Convert a string date representation to a UNIX timestamp +def date_to_unix(string): + return int( + # parse the string into a datetime object + datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S") + # tell it to consider itself a UTC time + .replace(tzinfo = datetime.timezone.utc) + # obtain the timestamp + .timestamp() + ) diff --git a/tests/generate.py b/tests/generate.py new file mode 100644 index 0000000..7e647a4 --- /dev/null +++ b/tests/generate.py @@ -0,0 +1,67 @@ +import random + +from date_utils import * + + +# Class hierarchy for randomly generating various kinds of data for input into +# the program +class Generate: + def generate(): + raise NotImplementedError() + +class GenerateString(Generate): + def generate(): + length = random.randint(1, 200) + return "".join([chr(random.randint(ord(' '), ord('~'))) for _ in range(length)]) + +class GenerateName(GenerateString): + pass + +class GenerateEmail(GenerateString): + pass + +class GenerateDate(Generate): + def generate(): + while True: + year = random.randint(1990, 2100) + month = random.randint(1, 12) + day = random.randint(1, days_in_month(year, month)) + hour = random.randint(0, 23) + minute = random.randint(0, 59) + second = random.randint(0, 59) + string = "{:04}-{:02}-{:02} {:02}:{:02}:{:02}".format(year, month, day, hour, minute, second) + if date_to_unix(string) >= 1511740800: # TODO: Change to variable date + return string + +class GenerateDatePair(Generate): + def generate(): + while True: + values = [GenerateDate.generate(), GenerateDate.generate()] + # if values[0] == values[1]: + # continue + values.sort() + return tuple(values) + +class GenerateDie(Generate): + def generate(): + return random.randint(1, 6) + +class GenerateDice(Generate): + def generate(): + length = 100 if random.randint(0, 1) == 0 else random.randint(100, 1000) + return "".join(str(GenerateDie.generate()) for _ in range(length)) + +class GenerateSymmetricKey(GenerateString): + pass + +class GenerateInput(): + def generate(): + datepair = GenerateDatePair.generate() + return { + "name": GenerateName.generate(), + "email": GenerateEmail.generate(), + "creation": datepair[0], + "expiration": datepair[1], + "dice": GenerateDice.generate(), + "key": GenerateSymmetricKey.generate(), + } diff --git a/tests/integration_test.py b/tests/integration_test.py new file mode 100755 index 0000000..bb4923e --- /dev/null +++ b/tests/integration_test.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 + +import dataclasses, filecmp, re, os, random, subprocess, shutil, sys, tempfile, time +from dataclasses import dataclass +from typing import List, Tuple + +from date_utils import * +from generate import * +from packet_parser import * + + +# Specification for an execution of the program +@dataclass +class AppInput(Generate): + key_type: str + name: str + email: str + creation: str + expiration: str + dice: str + key: str + + # Generate an input specification given a key type + def generate(key_type): + values = GenerateInput.generate() + return AppInput( + key_type, + values["name"], + values["email"], + values["creation"], + values["expiration"], + values["dice"], + values["key"] + ) + + +# A file name that is very unlikely to be chosen again in this same process +def safe_temporary_name(): + return "tmp_" + str(time.process_time()) + "_" + str(random.random()) + ".tmp" + +# Create a new file with random bytes as content +def make_random_file(workdir, size): + fname = os.path.join(workdir, safe_temporary_name()) + with open(fname, "wb") as f: + f.write(bytes(random.choices(range(0, 256), k = size))) + return fname + + +# Context manager for interacting with a process line-wise +class Application: + # kwargs: + # - stderr: either of: + # - None to send stderr to the terminal + # - subprocess.STDOUT to join stderr into stdout + # - subprocess.PIPE (internal, for subclasses) + def __init__(self, exec_name, args, **kwargs): + self._args = [exec_name] + args + self._stderr = kwargs.get("stderr") + self._line_filter = None + + def __enter__(self): + self._proc = subprocess.Popen( + self._args, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = self._stderr + ) + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + try: + self._proc.wait(timeout = 1) + except subprocess.TimeoutExpired: + self._proc.kill() + + def write_data(self, data): + self._proc.stdin.write(data) + self._proc.stdin.flush() + + def write_line(self, line): + if "\n" in line: + raise Exception("Invalid newline in Application.write_line()") + self._proc.stdin.write((line + "\n").encode("utf8")) + self._proc.stdin.flush() + + def read_line(self, line, timeout_ms = 1000): + while True: + line = self._proc.stdout.read_line().decode("utf8") + if line[-1] == "\n": + line = line[:-1] + if self._line_filter is None or self._line_filter(line): + break + return line + + def read_all(self): + lines = self._proc.stdout.read().decode("utf8").split("\n") + if self._line_filter is None: + return "\n".join(lines) + else: + return "\n".join(line for line in lines if self._line_filter(line)) + +class KeygenApplication(Application): + def __init__(self, exec_name, keyfile, appinput): + args = [ + "-o", keyfile, + "-t", appinput.key_type, + "-n", appinput.name, + "-e", appinput.email, + "-s", appinput.creation, + "-x", appinput.expiration + ] + super().__init__(exec_name, args) + +class GPGApplication(Application): + # kwargs: + # - also_stderr: if True, join the stderr stream into the stdout stream. Incompatible with ignore_stderr. + # - ignore_stderr: if True, pass stderr to /dev/null. Incompatible with also_stderr. + # - gpg_homedir: if not None, directory to use as GPG homedir. If not given, uses a new temporary directory. + def __init__(self, args, **kwargs): + if kwargs.get("gpg_homedir") is not None: + self._gpg_homedir = None + self._gpg_homedir_name = kwargs.get("gpg_homedir") + else: + self._gpg_homedir = tempfile.TemporaryDirectory() + self._gpg_homedir_name = self._gpg_homedir.name + + if kwargs.get("also_stderr") and kwargs.get("ignore_stderr"): + raise Exception("Cannot pass both also_stderr and ignore_stderr to GPGApplication") + + if kwargs.get("also_stderr"): + super().__init__("gpg", ["--homedir", self._gpg_homedir_name] + args, stderr = subprocess.STDOUT) + self._line_filter = lambda line: re.match(r"^gpg: (keybox '.*' created|.*: trustdb created)$", line) is None + + self._gpg_should_grep = False + elif kwargs.get("ignore_stderr"): + super().__init__("gpg", ["--homedir", self._gpg_homedir_name] + args, stderr = subprocess.DEVNULL) + + self._gpg_should_grep = False + else: + super().__init__("gpg", ["--homedir", self._gpg_homedir_name] + args, stderr = subprocess.PIPE) + + self._gpg_should_grep = True + + def __enter__(self): + super().__enter__() + if self._gpg_should_grep: + # Hack: filter out some unnecessary lines with grep + subprocess.Popen(["grep", "-v", r"^gpg: \(keybox '.*' created\|.*: trustdb created\)$"], stdin = self._proc.stderr) + return self + + def __exit__(self, *args): + # If we created a temporary directory for the GPG homedir in __init__, clean it up here + if self._gpg_homedir is not None: + self._gpg_homedir.cleanup() + + super().__exit__(*args) + +def gracefully_terminate(proc): + proc.terminate() + try: + proc.wait(timeout = 1) + except subprocess.TimeoutExpired: + proc.kill() + assert proc.wait() is not None + +class SSHD: + def __init__(self): + self._tempdir = tempfile.TemporaryDirectory() + self._tempdir_name = self._tempdir.name + + self._port = 2222 + self._config_file = os.path.join(self._tempdir_name, "sshd_config") + self._auth_keys_file = os.path.join(self._tempdir_name, "authorized_keys") + self._host_key_file = os.path.join(self._tempdir_name, "ssh_host_rsa_key") + + # Create the authorized_keys file so that we can set its permissions to 600 + with open(self._auth_keys_file, "w") as f: pass + os.chmod(self._auth_keys_file, 0o600) + + def __enter__(self): + # Write the sshd_config file + with open(self._config_file, "w") as f: + f.write("""Port {} +AuthorizedKeysFile {} +ChallengeResponseAuthentication no +PasswordAuthentication no +UsePAM no +HostKey {} +ForceCommand true +StrictModes no +""".format(self._port, self._auth_keys_file, self._host_key_file)) + + # Generate the host key + subprocess.check_call(["ssh-keygen", "-t", "rsa", "-N", "", "-f", self._host_key_file]) + + # Start sshd + self._proc = subprocess.Popen(["/usr/bin/sshd", "-ddd", "-f", self._config_file], stderr = open("sshd_output.txt", "w")) + + return self + + def __exit__(self, *args): + gracefully_terminate(self._proc) + self._tempdir.cleanup() + + def set_authorized_key(self, key_string): + with open(self._auth_keys_file, "w") as f: + f.write(key_string + "\n") + print("### file " + self._auth_keys_file + " contains '" + key_string + "'") + + def port(self): + return self._port + + def host_key_entry(self): + with open(self._host_key_file + ".pub") as f: + return "127.0.0.1 " + f.read().strip() + +class GPGAgent: + def __init__(self, homedir): + self._homedir = homedir + + def __enter__(self): + self._proc = subprocess.Popen(["gpg-agent", "--homedir", self._homedir, "--daemon"]) + + def __exit__(self, *args): + gracefully_terminate(self._proc) + + +def parse_pgp_packet(filename): + # Parse the packet stream using gpg + with GPGApplication(["--list-packets", "--verbose", filename]) as app: + output = app.read_all().split("\n") + + # then parse gpg's output + return parse_gpg_packet_listing(output) + +# Passes all keyword arguments on to GPGApplication. +def import_gpg_packet(filename, **kwargs): + with GPGApplication(["--import", filename], also_stderr = True, **kwargs) as app: + output = app.read_all().split("\n") + + l = [ + re.match(r'^gpg: key [0-9A-F]*: public key ".*" imported$', output[0]), + re.match(r'^gpg: key [0-9A-F]*: secret key imported$', output[1]), + re.match(r'^gpg: Total number processed: 1$', output[2]), + re.match(r'^gpg: imported: 1$', output[3]), + re.match(r'^gpg: secret keys read: 1$', output[4]), + re.match(r'^gpg: secret keys imported: 1$', output[5]), + ] + + if not all(l): + # for debugging + print(l) + + return all(l) + +# Passes all keyword arguments on to GPGApplication. +def sign_encrypt_file(keyid, message_fname, output_fname, **kwargs): + # Remove the output file if it already exists + if os.access(output_fname, os.F_OK): + os.remove(output_fname) + + with GPGApplication([ + "--sign", "--encrypt", # sign and encrypt + "--local-user", keyid, # using this key + "-r", keyid, # encrypt for the same key + "-o", output_fname, # write the result to this file + "--trusted-key", keyid, # trust our key (otherwise GPG won't encrypt for it) + message_fname + ], ignore_stderr = True, **kwargs) as app: + # Ignore the output + app.read_all() + + # The output file should now only exist if the operation succeeded + return os.access(output_fname, os.F_OK) + +# Passes all keyword arguments on to GPGApplication. +def decrypt_file(encrypted_fname, output_fname, **kwargs): + # Remove the output file if it already exists + if os.access(output_fname, os.F_OK): + os.remove(output_fname) + + with GPGApplication(["--decrypt", "-o", output_fname, encrypted_fname], ignore_stderr = True, **kwargs) as app: + # Ignore the output + app.read_all() + + # The output file should now only exist if the operation succeeded + return os.access(output_fname, os.F_OK) + +# Passes all keyword arguments on to GPGApplication. +def export_ssh_authorized_key(keyid, **kwargs): + with GPGApplication(["--export-ssh-key", keyid], **kwargs) as app: + return app.read_all().strip() + +# Passes all keyword arguments on to GPGApplication. +def export_gpg_secret_key(keyid, **kwargs): + with GPGApplication(["--armor", "--export-secret-key", keyid], **kwargs) as app: + return app.read_all() + +# Passes all keyword arguments on to GPGApplication. +def get_keygrip(keyid, **kwargs): + with GPGApplication(["--list-keys", "--with-colons", "--with-keygrip", keyid], **kwargs) as app: + output = app.read_all() + + current_id = None + + for line in output.split("\n"): + if len(line) == 0: + continue + + fields = line.split(":") + if fields[0] == "pub" or fields[0] == "sub": + # Field 4 is the key ID + current_id = fields[4] + elif fields[0] == "grp": + if current_id == keyid: + # Field 9 is the keygrip + return fields[9] + + return None + +# Use the specification to generate an initial key and its recovery seed +def generate_initial_key(workdir, exec_name, appinput): + keyfile = os.path.join(workdir, safe_temporary_name()) + + with KeygenApplication(exec_name, keyfile, appinput) as app: + app.write_line("") # generate a new key, no recovery seed + app.write_line(appinput.dice) + app.write_line(appinput.key) + + text = app.read_all() + idx1 = text.find("write down the following recovery seed:") + idx2 = text.rfind("write down the following recovery seed:") + assert idx1 == idx2 + + seed_start = text.find(":", idx1) + 2 + seed = text[seed_start:].split("\n")[0] + + return keyfile, seed + +# Use the specification to regenerate the previous key from its recovery seed +def regenerate_key(workdir, exec_name, appinput, rec_seed): + keyfile = os.path.join(workdir, safe_temporary_name()) + + with KeygenApplication(exec_name, keyfile, appinput) as app: + app.write_line(rec_seed) # regenerate a previous key from a recovery seed + app.write_line(appinput.key) # with this symmetric key + + return keyfile + + +def report_error(appinput, keyfile): + print(appinput) + fname = "integration_test_keyfile_on_error_{}".format(int(time.time())) + shutil.copy(keyfile, fname) + print("Generated key file copied to '{}'".format(fname)) + +def run_test(exec_name, key_class, sshd): + with tempfile.TemporaryDirectory() as tempdir: + # --- Ensure that the temporary directory is only rwx by us + os.chmod(tempdir, 0o700) + + # --- Generate a new input set + appinput = AppInput.generate(key_class) + + # --- Generate the key, and regenerate the key + keyfile1, rec_seed = generate_initial_key(tempdir, exec_name, appinput) + keyfile2 = regenerate_key(tempdir, exec_name, appinput, rec_seed) + + # --- Parse the keys using GPG and check equivalence + parsed1 = parse_pgp_packet(keyfile1) + parsed2 = parse_pgp_packet(keyfile2) + # Note that this equality does what we want: the 'data' fields + # of signatures are not included in the comparison. + if parsed1 != parsed2: + print("Key recovery didn't work") + report_error(appinput, keyfile1) + return False + + # --- Extract the main key id + assert isinstance(parsed1[0], SecretKeyPacket) + keyid = parsed1[0].keyid + + # --- Check sanity of the signature creation timestamp + creation_stamp = date_to_unix(appinput.creation) + + for packet in parsed1: + if isinstance(packet, SignaturePacket): + if packet.created != creation_stamp: + print("Signature creation timestamp is incorrect") + print(packet.created) + print(creation_stamp) + report_error(appinput, keyfile1) + return False + + # --- Now we wish to perform more extensive testing on the + # generated key after it is imported, so we create a + # dedicated GPG homedir for gpg to store its state in + with tempfile.TemporaryDirectory() as gpg_homedir: + # We also need a GPG agent in that directory + with GPGAgent(gpg_homedir): + # --- Test importing a key + if not import_gpg_packet(keyfile1, gpg_homedir = gpg_homedir): + print("Key import didn't work") + report_error(appinput, keyfile1) + return False + + # --- Test signing and encrypting data + message_fname = make_random_file(tempdir, 1000) + output_fname = os.path.join(tempdir, safe_temporary_name()) + if not sign_encrypt_file(keyid, message_fname, output_fname, gpg_homedir = gpg_homedir): + print("Sign+encrypt didn't work") + report_error(appinput, keyfile1) + return False + + # --- Test decrypting (and verifying) the file created above + decrypt_fname = os.path.join(tempdir, safe_temporary_name()) + if not decrypt_file(output_fname, decrypt_fname, gpg_homedir = gpg_homedir): + print("Decrypt didn't work") + report_error(appinput, keyfile1) + return False + + # --- Check whether decryption yielded the original file again + if not filecmp.cmp(message_fname, decrypt_fname, shallow = False): + print("Decryption produced a different file than was encrypted") + report_error(appinput, keyfile1) + return False + + # --- Now test the authentication subkey. + # Find the authentication subkey + auth_keyid = find_key_with_flags(parsed1, 0x20) + if auth_keyid is None: + print("No purely-authentication subkey found") + report_error(appinput, keyfile1) + return False + + # Inform gpg-agent that this authentication subkey should be used for SSH authentication + auth_keygrip = get_keygrip(auth_keyid, gpg_homedir = gpg_homedir) + with open(os.path.join(gpg_homedir, "sshcontrol"), "w") as f: + f.write(auth_keygrip + "\n") + + # Get the name of the socket that gpg-agent listens on for SSH authentication + ssh_auth_sock = subprocess.check_output(["gpgconf", "--homedir", gpg_homedir, "--list-dirs", "agent-ssh-socket"]) \ + .strip() + + print("### ssh_auth_sock = {}".format(ssh_auth_sock)) + + # Export the ssh key using gpg and authorize it in the sshd daemon + # (note that this automatically selects the authentication subkey) + ssh_authorized_key_string = export_ssh_authorized_key(keyid, gpg_homedir = gpg_homedir) + sshd.set_authorized_key(ssh_authorized_key_string) + + # Get the host key and write it to a file + ssh_host_key_entry = sshd.host_key_entry() + ssh_known_hosts_name = os.path.join(tempdir, safe_temporary_name()) + with open(ssh_known_hosts_name, "w") as f: f.write(ssh_host_key_entry + "\n") + + # Try logging in + try: + ssh_env = os.environ.copy() + ssh_env["SSH_AUTH_SOCK"] = ssh_auth_sock + subprocess.check_call(["ssh", "-vv", "-o", "CheckHostIP=no", "-o", "UserKnownHostsFile=" + ssh_known_hosts_name, "-p", str(sshd.port()), "127.0.0.1"], env = ssh_env) + except subprocess.CalledProcessError: + print("ssh failed, possibly due to invalid authentication subkey?") + report_error(appinput, keyfile1) + return False + + return True + + +def main(): + if len(sys.argv) == 2: + exec_name = sys.argv[1] + else: + print("Usage: {} ", file = sys.stderr) + sys.exit(1) + + num_tests = 20 + key_classes = ["eddsa", "ecdsa", "rsa2048"] + + with SSHD() as sshd: + for key_class in key_classes: + print("Running {} random tests for {}...".format(num_tests, key_class)) + + for test_index in range(num_tests): + if not run_test(exec_name, key_class, sshd): + sys.exit(1) + + print("Succeeded!") + +if __name__ == "__main__": + main() diff --git a/tests/packet_parser.py b/tests/packet_parser.py new file mode 100644 index 0000000..cff969a --- /dev/null +++ b/tests/packet_parser.py @@ -0,0 +1,199 @@ +import dataclasses, re +from dataclasses import dataclass +from typing import List, Tuple + + +class Subpacket: + pass + +@dataclass +class SigCreatedSubpacket(Subpacket): + date: str + +@dataclass +class KeyExpirationSubpacket(Subpacket): + expires: str + +@dataclass +class KeyFlagsSubpacket(Subpacket): + flags: int + +@dataclass +class IssuerSubpacket(Subpacket): + issuer: str + +@dataclass +class IssuerFingerprintSubpacket(Subpacket): + issuerFingerprint: str + +@dataclass +class PrimaryKeyBindingSubpacket(Subpacket): + sigclass: int + algo: int + digestalgo: int + + +class Packet: + pass + +@dataclass +class SecretKeyPacket(Packet): + version: int + algo: str + created: int + expires: int + keys: List[Tuple[str, int, str]] + checksum: str + keyid: str + +@dataclass +class UserIDPacket(Packet): + userid: str + +@dataclass +class SignaturePacket(Packet): + algo: int + keyid: str + version: int + created: int + md5len: int + sigclass: int + digest: Tuple[int, str] + hashed_subs: List[Subpacket] + unhashed_subs: List[Subpacket] + # Do not compare the data fields, because the actual signature data does + # not need to be deterministic -- thus equality of that data is not really + # informative + datas: List[str] = dataclasses.field(compare = False) + + +def find_key_with_flags(packets, target_flags): + for packet in packets: + if isinstance(packet, SignaturePacket): + flags = [subpacket.flags + for subpacket in packet.hashed_subs + if isinstance(subpacket, KeyFlagsSubpacket)] + if len(flags) == 0: + raise Exception("No key_flags subpacket in signature of keyid {}".format(packet.keyid)) + if len(flags) > 1: + raise Exception("Multiple key_flags subpackets in signature of keyid {}".format(packet.keyid)) + if flags[0] == target_flags: + return packet.keyid + + return None + + +def parse_gpg_packet_listing(output): + # Ignore the offset comments + output = [line for line in output if not line.startswith("#")] + # Where are we currently in the output + cursor = 0 + + # First a :type: line with possible extra text, then an indented block + # This updates the 'cursor' variable in this scope + # Returns: (type, extra text, [block lines]) + def read_packet(): + nonlocal cursor + + match = re.match(r"^:([^:]*):(.*)$", output[cursor]) + assert match is not None + + cursor += 1 + + lines = [] + while cursor < len(output) and output[cursor][:1].isspace(): + lines.append(output[cursor].lstrip()) + cursor += 1 + + return match.group(1), match.group(2), lines + + # Returns the subpacket parsed + def parse_subpacket(typeid, text): + if typeid == 2: # sig created + match = re.match(r"^sig created (.*)$", text) + return SigCreatedSubpacket(match.group(1)) + elif typeid == 9: # key expires + match = re.match(r"^key expires after (.*)$", text) + return KeyExpirationSubpacket(match.group(1)) + elif typeid == 27: # key flags + match = re.match(r"^key flags: (.*)$", text) + return KeyFlagsSubpacket(int(match.group(1), 16)) + elif typeid == 16: # issuer + match = re.match(r"^issuer key ID (.*)$", text) + return IssuerSubpacket(match.group(1)) + elif typeid == 32: # primary key binding (signature) + match = re.match(r"^signature: v4, class ([^,]*), algo ([^,]*), digest algo (.*)$", text) + return PrimaryKeyBindingSubpacket(int(match.group(1), 16), int(match.group(2)), int(match.group(3))) + elif typeid == 33: # issuer fingerprint + match = re.match(r"^issuer fpr v4 (.*)$", text) + return IssuerFingerprintSubpacket(match.group(1)) + else: + raise Exception("Unrecognised subpacket id " + str(typeid)) + + # Returns the packet parsed + def interpret_packet(typestr, extra_text, block_lines): + if typestr == "secret key packet" or typestr == "secret sub key packet": + res = SecretKeyPacket(-1, "", -1, -1, [], "", "") + + for line in block_lines: + if line.startswith("version"): + for part in line.split(","): + part = part.strip().split(" ") + if part[0] == "version": res.version = int(part[1]) + elif part[0] == "algo": res.algo = part[1] + elif part[0] == "created": res.created = int(part[1]) + elif part[0] == "expires": res.expires = int(part[1]) + elif line.startswith("pkey") or line.startswith("skey"): + match = re.match(r"^([^[]*)\[([^]]*)\]: (.*)$", line) + assert match is not None + res.keys.append((match.group(1), int(match.group(2)), match.group(3))) + elif line.startswith("checksum"): + res.checksum = line.split(" ")[1] + elif line.startswith("keyid"): + res.keyid = line.split(" ")[1] + else: + raise Exception("Unrecognised line in packet") + + return res + + elif typestr == "user ID packet": + return UserIDPacket(extra_text[1:-1]) + + elif typestr == "signature packet": + res = SignaturePacket(-1, "", -1, -1, -1, -1, (-1, ""), [], [], []) + + for line in block_lines: + if line.startswith("version"): + for part in (line + ", " + extra_text).split(","): + part = part.strip().split(" ") + if part[0] == "version": res.version = int(part[1]) + elif part[0] == "created": res.created = int(part[1]) + elif part[0] == "md5len": res.md5len = int(part[1]) + elif part[0] == "sigclass": res.sigclass = int(part[1], 16) + elif part[0] == "algo": res.algo = int(part[1]) + elif part[0] == "keyid": res.keyid = part[1] + elif line.startswith("digest"): + match = re.match(r"^digest algo (.*), begin of digest (.*)$", line) + res.digest = (int(match.group(1)), match.group(2)) + elif line.startswith("hashed subpkt") or line.startswith("subpkt"): + match = re.match(r"^(?:hashed )?subpkt ([^ ]*) len (?:[^ ]*) \((.*)\)$", line) + res.hashed_subs.append(parse_subpacket(int(match.group(1)), match.group(2))) + elif line.startswith("data"): + match = re.match(r"^data: (.*)$", line) + res.datas.append(match.group(1)) + else: + raise Exception("Unrecognised line in packet") + + return res + + else: + raise Exception("Unrecognised packet type '" + typestr + "'") + + packets = [] + while cursor < len(output): + if len(output[cursor]) == 0: + cursor += 1 + continue + packets.append(interpret_packet(*read_packet())) + + return packets diff --git a/time_utils.cpp b/time_utils.cpp index afd38a6..e8a9bdb 100644 --- a/time_utils.cpp +++ b/time_utils.cpp @@ -41,7 +41,7 @@ std::time_t time_utils::tm_to_utc_unix_timestamp(const std::tm &time) std::time_t second_in_year = 24 * 3600 * day_in_year + second_in_day; - std::time_t seconds_before_year = 24 * 3600 * days_since_unix_epoch(1900 + time.tm_year); + std::time_t seconds_before_year = 24 * 3600 * static_cast(days_since_unix_epoch(1900 + time.tm_year)); return seconds_before_year + second_in_year; }