From 9f529b0e85c7b38e97d8ebb0371f7a6859f882f4 Mon Sep 17 00:00:00 2001 From: davidovski Date: Sun, 13 Feb 2022 23:22:33 +0000 Subject: started rewrite with xisync --- src/verbs/install.py | 471 --------------------------------------------------- 1 file changed, 471 deletions(-) delete mode 100644 src/verbs/install.py (limited to 'src/verbs/install.py') diff --git a/src/verbs/install.py b/src/verbs/install.py deleted file mode 100644 index 0690da3..0000000 --- a/src/verbs/install.py +++ /dev/null @@ -1,471 +0,0 @@ -import os -import re -import util -import colors -import time -import requests -import hashlib - -from verbs.sync import sync, run_post_install - -def get_best_source(available, sources_list="/var/lib/xipkg/sources"): - source_speeds = {} - with open(sources_list, "r") as file: - for line in file.readlines(): - split = line.split(" ") - if len(split) > 0: - try: - if split[0] in available: - source_speeds[split[0]] = float(split[1]) - except: - pass - - return sorted(source_speeds.keys(), key=lambda k: source_speeds[k]) - - -def find_package(query, repos, packages_dir, sources): - for repo in repos: - repo_dir = os.path.join(packages_dir, repo) - files = os.listdir(repo_dir) - - if query in files: - requested_repo = repo - with open(os.path.join(repo_dir, query)) as file: - checksum = file.readline().strip().split("=")[-1] - size = file.readline().strip().split("=")[-1] - filecount = file.readline().strip().split("=")[-1] - listed_sources = file.readline().strip().split("=")[-1].split() - found_sources = { - source: util.add_path(url, repo) - for source, url in sources.items() - if source in listed_sources - } - return checksum, found_sources, requested_repo, int(size)*1000, int(filecount) - return None, [], None, 0, 0 - - -def verify_signature(package_file, package_info, - cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain", - verbose=False): - - checksum = package_info["CHECKSUM"] - - sig_cached_path = util.add_path(cache_dir, checksum + ".sig") - with open(sig_cached_path, "wb") as file: - file.write(package_info["SIGNATURE"]) - - if os.path.exists(keychain_dir): - keys = os.listdir(keychain_dir) - for key in keys: - key_path = util.add_path(keychain_dir, key) - - command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}" - - if "OK" in os.popen(command).read(): - return key - elif verbose: - print(colors.RED - + f"Failed to verify signature against {key}" - + colors.RESET) - - elif verbose: - print(colors.BLACK + "There are no keys to verify with") - return "" - - -def retrieve_package_info(sources, checksum, package_name, config, - verbose=False, skip_verification=False): - - sources_list=config["dir"]["sources"] - cache_dir=config["dir"]["cache"] - - # TODO we may potentially do this a few times while resolving deps, might want to cache things here - # TODO or find cached package checksum from the cache folder - for source in get_best_source(sources, sources_list=sources_list): - url = sources[source] - - package_info_url = util.add_path(url, package_name + ".xipkg.info") - status, response = util.curl(package_info_url, raw=True) - - if status == 200: - info = parse_package_info(response) - if info["CHECKSUM"] == checksum or skip_verification: - return info - else: - if verbose: - print(colors.RED - + f"Checksum verification failed for {package_name} in {source}" - + colors.RESET) - if verbose: - print(colors.RED + f"No matching hashes found" + colors.RESET) - return {} - -# Does not verify the package itself, will only blindly accept the best size it can -def query_package_size(sources, package_info, package_name, config, verbose=False): - sources_list=config["dir"]["sources"] - for source in get_best_source(sources, sources_list=sources_list): - url = sources[source] - if verbose: - print(colors.LIGHT_BLACK + f"using source {source} at {url} for {package_name}") - - package_url = util.add_path(url, package_name + ".xipkg") - size = util.query_size(package_url) - if size > 0: - return size - return 0 - -def retrieve_package(sources, package_info, package_name, config, completed=0, total_download=-1, - verbose=False, skip_verification=False): - - sources_list=config["dir"]["sources"] - cache_dir=config["dir"]["cache"] - keychain_dir=config["dir"]["keychain"] - - checksum = package_info["CHECKSUM"] - - for source in get_best_source(sources, sources_list=sources_list): - url = sources[source] - if verbose: - print(colors.LIGHT_BLACK + f"using source {source} at {url}") - package_url = util.add_path(url, package_name + ".xipkg") - package_dir = util.add_path(cache_dir, source) - - util.mkdir(package_dir) - - if total_download == -1: - text = package_name + ".xipkg" - else: - text = "packages..." - - # TODO if package already downloaded maybe just use cached version - status, package_path, size = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), - start=completed, total=total_download, text=text) - - if status == 200: - downloaded_checksum = util.md5sum(package_path) - - if not skip_verification: - if downloaded_checksum == checksum: - sig = verify_signature(package_path, package_info, - cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose) - if len(sig) > 0: - return package_path, source, sig, size - elif verbose: - print(colors.RED - + f"Failed to verify signature for {package_name} in {source}" - + colors.RESET) - elif verbose: - print(colors.RED - + f"Checksum verification failed for {package_name} in {source}" - + colors.RESET) - else: - return package_path, source, "none", size - print(colors.RESET + colors.RED + f"No valid packages found for {package_name}" + colors.RESET) - return "", "", "", 0 - -def parse_package_info(packageinfo): - info = {} - lines = packageinfo.split(b"\n") - - index = 0 - while index < len(lines): - line = lines[index] - split = line.split(b"=") - if len(split) > 1: - if split[0] == b"SIGNATURE": - index += 1 - digest = b"\n".join(lines[index:]) - info["SIGNATURE"] = digest - break; - else: - info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8") - index += 1 - return info - -def get_available_version(package_name, config, root="/"): - repos = config["repos"] - packages_dir = config["dir"]["packages"] - sources = config["sources"] - checksum, found_sources, requested_repo, size, files = find_package(package_name, repos, packages_dir, sources) - return checksum - -def get_installed_version(package, config, root="/"): - - installed_info = util.add_path(root, config["dir"]["installed"], package, "info") - if os.path.exists(installed_info): - with open(installed_info) as file: - for line in file: - if line.startswith("CHECKSUM"): - return line.strip().split("=")[-1] - return None - -def update_needed(package, new_checksum, config, root="/"): - version = get_installed_version(package, config, root) - return not new_checksum == version - -def resolve_dependencies(package_info): - d = [ - dep - for dep in re.findall("[\w,-]*", package_info["DEPS"]) - if len(dep) > 0 - ] - return d - -def find_all_dependencies(package_names, options, config): - # this is all assuming that the order of deps installed doesn't matter - failed = [] - to_check = [p for p in package_names] - dependencies = {} - - while len(to_check) > 0: - util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolving dependencies...") - dep = to_check.pop() - - dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) - - if dep_checksum is not None: - dependencies[dep] = dep_checksum - - info = retrieve_package_info( - dep_sources, dep_checksum, dep, config, - verbose=options["v"], skip_verification=options["u"] - ) - - if len(info) > 0: - [to_check.append(d) for d in resolve_dependencies(info) if not (d in dependencies or d in to_check)] - - else: - if not dep in failed: failed.append(dep) - if options["v"]: - util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}") - else: - if not dep in failed: failed.append(dep) - if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to find package {dep}") - - util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolved dependencies") - print(colors.RESET) - - to_install = [] - to_update = [] - for dep,checksum in dependencies.items(): - if not is_installed(dep, config, options["r"]): - to_install.append(dep) - elif update_needed(dep, checksum, config, options["r"]): - to_update.append(dep) - - # assuming that the latter packages are core dependencies - # we can reverse the array to reflect the more important packages to install - to_install.reverse() - to_update.reverse() - return to_install, to_update, failed - -def is_installed(package_name, config, root="/"): - installed_dir = util.add_path(root, config["dir"]["installed"]) - if os.path.exists(installed_dir): - files = os.listdir(installed_dir) - return package_name in files - return False - -def install_package(package_name, package_path, package_info, - repo, source_url, key, post_install, - config, verbose=False, root="/"): - - # TODO loading bar here - files = util.extract_tar(package_path, root) - if post_install: - run_post_install(config, verbose=verbose, root=root) - save_installed_info(package_name, package_info, files, repo, source_url, key, config, root=root) - return files - - - -def save_installed_info(package_name, package_info, - files, repo, source_url, key, - config, root=""): - installed_dir = util.add_path(root, config["dir"]["installed"], package_name) - util.mkdir(installed_dir) - - name = package_info["NAME"] - description = package_info["DESCRIPTION"] if "DESCRIPTION" in package_info else "" - installed_checksum = package_info["CHECKSUM"] - build_date = package_info["DATE"] - version = package_info["VERSION"] - installed_date = os.popen("date").read() - - package_url = util.add_path(source_url, repo, package_name + ".xipkg") - - info_file = util.add_path(installed_dir, "info") - with open(info_file, "w") as file: - file.write(f"NAME={name}\n") - file.write(f"DESCRIPTION={description}\n") - file.write(f"CHECKSUM={installed_checksum}\n") - file.write(f"VERSION={version}\n") - file.write(f"INSTALL_DATE={installed_date}") - file.write(f"BUILD_DATE={build_date}\n") - file.write(f"KEY={key}\n") - file.write(f"URL={package_url}\n") - file.write(f"REPO={repo}\n") - - files_file = util.add_path(installed_dir, "files") - with open(files_file, "w") as file: - file.write(files) - - -def install_single(package, options, config, post_install=True, verbose=False, unsafe=False): - checksum, sources, repo, size, files = find_package(package, config["repos"], - config["dir"]["packages"], config["sources"]) - - info = retrieve_package_info( - sources, checksum, package, config, - verbose=verbose, skip_verification=unsafe - ) - - package_path, source, key = retrieve_package(sources, - info, package, config, - verbose=verbose, skip_verification=unsafe) - - files = install_package(package, package_path, info, - repo, sources[source], key, post_install, - config, verbose=verbose, root=options["r"]) - - -def install_multiple(to_install, args, options, config, terminology=("install", "installed", "installing")): - v = options["v"] - unsafe = options["u"] - - length = 0 - total_files = 0 - infos = [] - for package in to_install: - util.loading_bar(len(infos), len(to_install), "Preparing Download") - checksum, sources, repo, size, filecount = find_package(package, config["repos"], - config["dir"]["packages"], config["sources"]) - - if checksum != None: - info = retrieve_package_info( - sources, checksum, package, config, - verbose=v, skip_verification=unsafe - ) - - # TODO make package_size be written in package info or sync list instead - length += int(size) - total_files += int(filecount) - - infos.append( - (package, sources, repo, info) - ) - - divisor, unit = util.get_unit(length) - - util.loading_bar(len(infos), len(to_install), "Preparing Download") - print(colors.RESET + colors.CLEAR_LINE, end="\r") - - print(colors.WHITE + "Total download size: " + colors.LIGHT_WHITE + str(round(length / divisor, 2)) + unit) - - if options["y"] or util.ask_confirmation(colors.WHITE + "Continue?"): - # TODO try catch over each package in each stage so that we can know if there are errors - - downloaded = 0 - pkg_files = [] - for package_info in infos: - (package, sources, repo, info) = package_info - - if options["v"]: - print(colors.BLACK + f"Fetching {package}") - package_path, source, key, size = retrieve_package(sources, - info, package, config, - completed=downloaded, total_download=length, - verbose=v, skip_verification=unsafe) - - if package_path == "": - print(colors.RED + f"Failed to download {package}") - else: - downloaded += size - - pkg_files.append( - (package, package_path, sources[source], key, repo, info) - ) - - util.loading_bar(int(length/divisor), int(length/divisor), "Downloaded packages", unit=unit) - print(colors.RESET) - - extracted = 0 - for f in pkg_files: - util.loading_bar(extracted, total_files, terminology[2].capitalize() + " files") - - (package, package_path, source, key, repo, info) = f - - files = install_package(package, package_path, info, - repo, source, key, options["r"] == "/", - config, verbose=v, root=options["r"]) - extracted += len(files.split("\n")) - - util.loading_bar(extracted, total_files, terminology[1].capitalize() + " files") - print(colors.RESET) - else: - print(colors.RED + "Action cancelled by user") - - -def install(args, options, config): - if not options["l"]: - sync(args, options, config) - - sources = config["sources"] - repos = config["repos"] - - v = options["v"] - unsafe = options["u"] - - packages_dir = config["dir"]["packages"] - - # have some interaction with sudo when necessary rather than always require it - # this check may need to be done sooner? - if util.is_root() or options["r"] != "/": - to_install, to_update, location_failed = args, [], [] - if options["n"]: - for dep in to_install: - dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) - if dep_checksum is None: - to_install.remove(dep) - location_failed.append(dep) - - else: - to_install, to_update, location_failed = find_all_dependencies(args, options, config) - - - if len(location_failed) > 0: - print(colors.RED + "Failed to locate the following packages:") - print(end="\t") - for d in location_failed: - print(colors.RED if d in args else colors.LIGHT_RED, d, end="") - print() - - together = [] - [together.append(p) for p in to_install] - [together.append(p) for p in to_update] - - if len(together) > 0: - - if len(to_install) > 0: - print(colors.BLUE + f"The following will be installed:") - print(end="\t") - for d in to_install: - print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="") - print() - if len(to_update) > 0: - print(colors.GREEN + f"The following will be updated:") - print(end="\t") - for d in to_update: - print(colors.GREEN if d in args else colors.LIGHT_GREEN, d, end="") - print() - - install_multiple(together, args, options, config) - else: - installed = " ".join([arg for arg in args - if is_installed(arg, config, options["r"])]) - if len(installed) > 0: - print(colors.CYAN + "Already installed", colors.LIGHT_CYAN + installed) - else: - print(colors.LIGHT_BLACK + "Nothing to do") - else: - print(colors.RED + "Root is required to install packages") -- cgit v1.2.1