From adb73c8b36a4145356477d3fd41cf70a3ae5fe71 Mon Sep 17 00:00:00 2001 From: davidovski Date: Sun, 21 Nov 2021 17:29:14 +0000 Subject: added signature verification and best source selection --- src/verbs/install.py | 156 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 121 insertions(+), 35 deletions(-) (limited to 'src/verbs') diff --git a/src/verbs/install.py b/src/verbs/install.py index 2026ab4..b5555a7 100644 --- a/src/verbs/install.py +++ b/src/verbs/install.py @@ -3,7 +3,23 @@ import re import util import colors import time - +import requests +import hashlib + +def get_best_source(exclude=[], sources_list="/var/lib/xipkg/sources"): + # TODO implement exclude + source_speeds = {} + with open(sources_list, "r") as file: + for line in file.readlines(): + split = line.split(" ") + if len(split) > 0: + try: + source_speeds[split[0]] = float(split[1]) + except: + pass + + return sorted(source_speeds.keys(), key=lambda k: source_speeds[k]) + def find_package(query, repos, packages_dir, sources): for repo in repos: @@ -24,14 +40,44 @@ def find_package(query, repos, packages_dir, sources): return None, [], None -def retrieve_package_info(sources, checksum, package_name, +def verify_signature(package_file, package_info, + cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain", + verbose=False): + + checksum = package_info["CHECKSUM"] + + sig_cached_path = util.add_path(cache_dir, checksum + ".sig") + with open(sig_cached_path, "wb") as file: + file.write(package_info["SIGNATURE"]) + + keys = os.listdir(keychain_dir) + for key in keys: + key_path = util.add_path(keychain_dir, key) + + command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}" + + if "OK" in os.popen(command).read(): + return True + elif verbose: + print(colors.RED + + f"Failed to verify signature against {key}" + + colors.RESET) + + return False + +def retrieve_package_info(sources, checksum, package_name, config, verbose=False, skip_verification=False): + + sources_list=config["dir"]["sources"] + cache_dir=config["dir"]["cache"] # TODO we may potentially do this a few times while resolving deps, might want to cache things here # TODO actually use the ping times we made earlier to decide which source to pick - for source,url in sources.items(): + for source in get_best_source(sources_list=sources_list): + url = sources[source] + package_info_url = util.add_path(url, package_name + ".xipkg.info") - status, response = util.curl(package_info_url) + status, response = util.curl(package_info_url, raw=True) if status == 200: info = parse_package_info(response) @@ -46,40 +92,69 @@ def retrieve_package_info(sources, checksum, package_name, print(colors.RED + f"No matching hashes found" + colors.RESET) return {} -def retrieve_package(sources, checksum, package_name, +def retrieve_package(sources, package_info, package_name, config, verbose=False, skip_verification=False): + sources_list=config["dir"]["sources"] + cache_dir=config["dir"]["cache"] + keychain_dir=config["dir"]["keychain"] + # TODO actually use the ping times we made earlier to decide which source to pick # TODO actually save tar file, and add loading bar - for source,url in sources.items(): - package_info_url = util.add_path(url, package_name + ".xipkg") - status, response = util.curl(package_info_url) - + + checksum = package_info["CHECKSUM"] + + for source in get_best_source(sources_list=sources_list): + url = sources[source] + package_url = util.add_path(url, package_name + ".xipkg") + package_dir = util.add_path(cache_dir, source) + + util.mkdir(package_dir) + status, package_path = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), text=package_name + ".xipkg") + if status == 200: - downloaded_checksum = util.md5sum(response) - print(downloaded_checksum, "compared to requested", checksum) - if downloaded_checksum == checksum or skip_verification: - return reponse + downloaded_checksum = util.md5sum(package_path) + + if not skip_verification: + if downloaded_checksum == checksum: + + if verify_signature(package_path, package_info, + cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose): + return package_path + elif verbose: + print(colors.RED + + f"Failed to verify signature for {package_name} in {source}" + + colors.RESET) + elif verbose: + print(colors.RED + + f"Checksum verification failed for {package_name} in {source}" + + colors.RESET) else: - if verbose: - print(colors.RED - + f"Checksum verification failed for {package_name} in {source}" - + colors.RESET) + return package_path if verbose: - print(colors.RED + f"No matching hashes found" + colors.RESET) - return {} + print(colors.RED + f"No valid packages found" + colors.RESET) + return "" def parse_package_info(packageinfo): info = {} + lines = packageinfo.split(b"\n") - for line in packageinfo.split("\n"): - split = line.split("=") + index = 0 + while index < len(lines): + line = lines[index] + split = line.split(b"=") if len(split) > 1: - info[split[0]] = "=".join(split[1:]) - + if split[0] == b"SIGNATURE": + index += 1 + digest = b"\n".join(lines[index:]) + info["SIGNATURE"] = digest + break; + else: + info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8") + index += 1 return info -def resolve_dependencies(package_info, config): +def resolve_dependencies(package_info): getpkgs = lambda deps: re.findall("\w*", deps) deps = getpkgs(package_info["DEPS"]) @@ -101,20 +176,20 @@ def find_all_dependencies(package_names, options, config): dep_checksum, dep_sources, dep_repo = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) if dep_checksum is not None: info = retrieve_package_info( - dep_sources, dep_checksum, dep, + dep_sources, dep_checksum, dep, config, verbose=options["v"], skip_verification=options["u"] ) if len(info) > 0: - all_deps.append(dep) - deps = resolve_dependencies(info, config) - for dep in deps: - if not dep in all_deps: - - if is_installed(dep, config): - print(colors.YELLOW + f"Package {query} has already been installed") - else: - to_check.append(dep) + if not dep in all_deps: + all_deps.append(dep) + deps = resolve_dependencies(info) + for dep in deps: + if not dep in all_deps: + if is_installed(dep, config): + print(colors.YELLOW + f"Package {query} has already been installed") + else: + to_check.append(dep) else: if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {query}") @@ -154,7 +229,18 @@ def install(args, options, config): print() if util.ask_confirmation(colors.BLUE + "Continue?", no_confirm=options["y"]): - print("installed") + + for package in to_install: + checksum, sources, repo = find_package(package, config["repos"], + config["dir"]["packages"], config["sources"]) + + info = retrieve_package_info( + sources, checksum, package, config, + verbose=v, skip_verification=unsafe + ) + + retrieve_package(sources, info, package, config, + verbose=v, skip_verification=unsafe) else: print(colors.RED + "Action cancelled by user") else: -- cgit v1.2.1