From adb73c8b36a4145356477d3fd41cf70a3ae5fe71 Mon Sep 17 00:00:00 2001 From: davidovski Date: Sun, 21 Nov 2021 17:29:14 +0000 Subject: added signature verification and best source selection --- src/options.py | 31 +++++----- src/util.py | 53 +++++++++++++++-- src/verbs/install.py | 156 +++++++++++++++++++++++++++++++++++++++------------ src/xi.py | 2 +- xipkg.conf | 1 + 5 files changed, 188 insertions(+), 55 deletions(-) diff --git a/src/options.py b/src/options.py index 499f9e7..011c039 100644 --- a/src/options.py +++ b/src/options.py @@ -61,27 +61,32 @@ def parse_args(): arg = args[index] if len(arg) > 1 and arg[0] == "-": - option = None + option = [] # is a named argument with a -- if arg[1] == "-" and len(arg) > 2 and arg[2:].split("=")[0] in names: - option = names[arg[2:].split("=")[0]] + option.appen(names[arg[2:].split("=")[0]]) # is a single letter argument with a - - elif arg[1] in options: - option = arg[1] else: - parsed["args"].append(arg) + for letter in arg[1:]: + if letter in options: + option.append(letter) + + if len(option) == 0: + parsed["args"].append(arg) + # add the option and any values ot the parsed dict - if option is not None: - if options[option]["flag"]: - parsed[option] = True - else: - if "=" in arg: - parsed[option] = arg.split("=")[1] + for opt in option: + if opt is not None: + if options[opt]["flag"]: + parsed[opt] = True else: - index += 1 - parsed[option] = args[index] + if "=" in arg: + parsed[opt] = arg.split("=")[1] + else: + index += 1 + parsed[opt] = args[index] else: parsed["args"].append(arg) diff --git a/src/util.py b/src/util.py index 99bb82e..cef5529 100644 --- a/src/util.py +++ b/src/util.py @@ -8,8 +8,11 @@ import hashlib DEFAULT_BAR_COLOR = colors.BLACK + colors.BG_CYAN DEFAULT_BAR_COLOR_RESET = colors.BG_BLACK + colors.CYAN -def add_path(a, b): - return a + (b if a[-1] == "/" else f"/{b}") +def add_path(*argv): + a = argv[0] + for b in argv[1:]: + a = a + (b if a[-1] == "/" else f"/{b}") + return a def loading_bar(completed, total, text, unit="", color=DEFAULT_BAR_COLOR, reset=DEFAULT_BAR_COLOR_RESET): @@ -29,19 +32,57 @@ def loading_bar(completed, total, text, def print_reset(text): print(colors.RESET + text) -def curl(url): +def curl(url, raw=False): try: r = requests.get(url) except: return 500, "" - return r.status_code, r.text + return r.status_code, r.content if raw else r.text + +def get_unit(n): + base = 1000 + if n > base**4: return base**4, "TB" + elif n > base**3: return base**3, "GB" + elif n > base**2: return base**2, "MB" + elif n > base**1: return base**1, "KB" + +def curl_to_file(url, path, text=""): + with requests.get(url, stream=True) as r: + r.raise_for_status() + length = int(r.headers['content-length']) + with open(path, "wb") as f: + + c_size = 4096 + ic = r.iter_content(chunk_size=c_size) + done = 0 + + for chunk in ic: + if text: + divisor, unit = get_unit(length) + loading_bar(int(done/divisor), int(length/divisor), "Downloading " + text, unit=unit) + + f.write(chunk) + done += c_size + if text: + divisor, unit = get_unit(length) + loading_bar(int(done/divisor), int(length/divisor), "Downloaded " + text, unit=unit) + print(colors.RESET) + + return r.status_code, path + def mkdir(path): if not os.path.exists(path): os.makedirs(path) -def md5sum(data): - return hashlib.md5(data) +def md5sum(filename): + md5_hash = hashlib.md5() + + with open(filename,"rb") as f: + for byte_block in iter(lambda: f.read(4096),b""): + md5_hash.update(byte_block) + + return md5_hash.hexdigest() def ask_confirmation(text, default=True, no_confirm=False): yes = "Y" if default else "y" diff --git a/src/verbs/install.py b/src/verbs/install.py index 2026ab4..b5555a7 100644 --- a/src/verbs/install.py +++ b/src/verbs/install.py @@ -3,7 +3,23 @@ import re import util import colors import time - +import requests +import hashlib + +def get_best_source(exclude=[], sources_list="/var/lib/xipkg/sources"): + # TODO implement exclude + source_speeds = {} + with open(sources_list, "r") as file: + for line in file.readlines(): + split = line.split(" ") + if len(split) > 0: + try: + source_speeds[split[0]] = float(split[1]) + except: + pass + + return sorted(source_speeds.keys(), key=lambda k: source_speeds[k]) + def find_package(query, repos, packages_dir, sources): for repo in repos: @@ -24,14 +40,44 @@ def find_package(query, repos, packages_dir, sources): return None, [], None -def retrieve_package_info(sources, checksum, package_name, +def verify_signature(package_file, package_info, + cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain", + verbose=False): + + checksum = package_info["CHECKSUM"] + + sig_cached_path = util.add_path(cache_dir, checksum + ".sig") + with open(sig_cached_path, "wb") as file: + file.write(package_info["SIGNATURE"]) + + keys = os.listdir(keychain_dir) + for key in keys: + key_path = util.add_path(keychain_dir, key) + + command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}" + + if "OK" in os.popen(command).read(): + return True + elif verbose: + print(colors.RED + + f"Failed to verify signature against {key}" + + colors.RESET) + + return False + +def retrieve_package_info(sources, checksum, package_name, config, verbose=False, skip_verification=False): + + sources_list=config["dir"]["sources"] + cache_dir=config["dir"]["cache"] # TODO we may potentially do this a few times while resolving deps, might want to cache things here # TODO actually use the ping times we made earlier to decide which source to pick - for source,url in sources.items(): + for source in get_best_source(sources_list=sources_list): + url = sources[source] + package_info_url = util.add_path(url, package_name + ".xipkg.info") - status, response = util.curl(package_info_url) + status, response = util.curl(package_info_url, raw=True) if status == 200: info = parse_package_info(response) @@ -46,40 +92,69 @@ def retrieve_package_info(sources, checksum, package_name, print(colors.RED + f"No matching hashes found" + colors.RESET) return {} -def retrieve_package(sources, checksum, package_name, +def retrieve_package(sources, package_info, package_name, config, verbose=False, skip_verification=False): + sources_list=config["dir"]["sources"] + cache_dir=config["dir"]["cache"] + keychain_dir=config["dir"]["keychain"] + # TODO actually use the ping times we made earlier to decide which source to pick # TODO actually save tar file, and add loading bar - for source,url in sources.items(): - package_info_url = util.add_path(url, package_name + ".xipkg") - status, response = util.curl(package_info_url) - + + checksum = package_info["CHECKSUM"] + + for source in get_best_source(sources_list=sources_list): + url = sources[source] + package_url = util.add_path(url, package_name + ".xipkg") + package_dir = util.add_path(cache_dir, source) + + util.mkdir(package_dir) + status, package_path = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), text=package_name + ".xipkg") + if status == 200: - downloaded_checksum = util.md5sum(response) - print(downloaded_checksum, "compared to requested", checksum) - if downloaded_checksum == checksum or skip_verification: - return reponse + downloaded_checksum = util.md5sum(package_path) + + if not skip_verification: + if downloaded_checksum == checksum: + + if verify_signature(package_path, package_info, + cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose): + return package_path + elif verbose: + print(colors.RED + + f"Failed to verify signature for {package_name} in {source}" + + colors.RESET) + elif verbose: + print(colors.RED + + f"Checksum verification failed for {package_name} in {source}" + + colors.RESET) else: - if verbose: - print(colors.RED - + f"Checksum verification failed for {package_name} in {source}" - + colors.RESET) + return package_path if verbose: - print(colors.RED + f"No matching hashes found" + colors.RESET) - return {} + print(colors.RED + f"No valid packages found" + colors.RESET) + return "" def parse_package_info(packageinfo): info = {} + lines = packageinfo.split(b"\n") - for line in packageinfo.split("\n"): - split = line.split("=") + index = 0 + while index < len(lines): + line = lines[index] + split = line.split(b"=") if len(split) > 1: - info[split[0]] = "=".join(split[1:]) - + if split[0] == b"SIGNATURE": + index += 1 + digest = b"\n".join(lines[index:]) + info["SIGNATURE"] = digest + break; + else: + info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8") + index += 1 return info -def resolve_dependencies(package_info, config): +def resolve_dependencies(package_info): getpkgs = lambda deps: re.findall("\w*", deps) deps = getpkgs(package_info["DEPS"]) @@ -101,20 +176,20 @@ def find_all_dependencies(package_names, options, config): dep_checksum, dep_sources, dep_repo = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) if dep_checksum is not None: info = retrieve_package_info( - dep_sources, dep_checksum, dep, + dep_sources, dep_checksum, dep, config, verbose=options["v"], skip_verification=options["u"] ) if len(info) > 0: - all_deps.append(dep) - deps = resolve_dependencies(info, config) - for dep in deps: - if not dep in all_deps: - - if is_installed(dep, config): - print(colors.YELLOW + f"Package {query} has already been installed") - else: - to_check.append(dep) + if not dep in all_deps: + all_deps.append(dep) + deps = resolve_dependencies(info) + for dep in deps: + if not dep in all_deps: + if is_installed(dep, config): + print(colors.YELLOW + f"Package {query} has already been installed") + else: + to_check.append(dep) else: if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {query}") @@ -154,7 +229,18 @@ def install(args, options, config): print() if util.ask_confirmation(colors.BLUE + "Continue?", no_confirm=options["y"]): - print("installed") + + for package in to_install: + checksum, sources, repo = find_package(package, config["repos"], + config["dir"]["packages"], config["sources"]) + + info = retrieve_package_info( + sources, checksum, package, config, + verbose=v, skip_verification=unsafe + ) + + retrieve_package(sources, info, package, config, + verbose=v, skip_verification=unsafe) else: print(colors.RED + "Action cancelled by user") else: diff --git a/src/xi.py b/src/xi.py index d00c2df..f6089ec 100644 --- a/src/xi.py +++ b/src/xi.py @@ -20,7 +20,7 @@ verbs = { v: globals()[v] for v in [ def main(): opts = options.parse_args() args = opts["args"] - + if opts["h"]: options.print_usage() return diff --git a/xipkg.conf b/xipkg.conf index 705c30d..964a60a 100644 --- a/xipkg.conf +++ b/xipkg.conf @@ -3,6 +3,7 @@ include /etc/xipkg.d/default.conf sources { + #localhost http://localhost:8089/repo/ davidovski https://xi.davidovski.xyz/repo/ codeberg https://xi.codeberg.page/repo/ ftp https://xilinux.ftp.sh/repo/ -- cgit v1.2.1