summaryrefslogtreecommitdiff
path: root/src/verbs/install.py
diff options
context:
space:
mode:
authordavidovski <david@davidovski.xyz>2022-02-13 23:22:33 +0000
committerdavidovski <david@davidovski.xyz>2022-02-13 23:22:33 +0000
commit9f529b0e85c7b38e97d8ebb0371f7a6859f882f4 (patch)
tree8de89a716bce066a5497b8263a00d9c007425e85 /src/verbs/install.py
parentf545930c0535293a37f5c1730d8b83264cf098b5 (diff)
started rewrite with xisync
Diffstat (limited to 'src/verbs/install.py')
-rw-r--r--src/verbs/install.py471
1 files changed, 0 insertions, 471 deletions
diff --git a/src/verbs/install.py b/src/verbs/install.py
deleted file mode 100644
index 0690da3..0000000
--- a/src/verbs/install.py
+++ /dev/null
@@ -1,471 +0,0 @@
-import os
-import re
-import util
-import colors
-import time
-import requests
-import hashlib
-
-from verbs.sync import sync, run_post_install
-
-def get_best_source(available, sources_list="/var/lib/xipkg/sources"):
- source_speeds = {}
- with open(sources_list, "r") as file:
- for line in file.readlines():
- split = line.split(" ")
- if len(split) > 0:
- try:
- if split[0] in available:
- source_speeds[split[0]] = float(split[1])
- except:
- pass
-
- return sorted(source_speeds.keys(), key=lambda k: source_speeds[k])
-
-
-def find_package(query, repos, packages_dir, sources):
- for repo in repos:
- repo_dir = os.path.join(packages_dir, repo)
- files = os.listdir(repo_dir)
-
- if query in files:
- requested_repo = repo
- with open(os.path.join(repo_dir, query)) as file:
- checksum = file.readline().strip().split("=")[-1]
- size = file.readline().strip().split("=")[-1]
- filecount = file.readline().strip().split("=")[-1]
- listed_sources = file.readline().strip().split("=")[-1].split()
- found_sources = {
- source: util.add_path(url, repo)
- for source, url in sources.items()
- if source in listed_sources
- }
- return checksum, found_sources, requested_repo, int(size)*1000, int(filecount)
- return None, [], None, 0, 0
-
-
-def verify_signature(package_file, package_info,
- cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain",
- verbose=False):
-
- checksum = package_info["CHECKSUM"]
-
- sig_cached_path = util.add_path(cache_dir, checksum + ".sig")
- with open(sig_cached_path, "wb") as file:
- file.write(package_info["SIGNATURE"])
-
- if os.path.exists(keychain_dir):
- keys = os.listdir(keychain_dir)
- for key in keys:
- key_path = util.add_path(keychain_dir, key)
-
- command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"
-
- if "OK" in os.popen(command).read():
- return key
- elif verbose:
- print(colors.RED
- + f"Failed to verify signature against {key}"
- + colors.RESET)
-
- elif verbose:
- print(colors.BLACK + "There are no keys to verify with")
- return ""
-
-
-def retrieve_package_info(sources, checksum, package_name, config,
- verbose=False, skip_verification=False):
-
- sources_list=config["dir"]["sources"]
- cache_dir=config["dir"]["cache"]
-
- # TODO we may potentially do this a few times while resolving deps, might want to cache things here
- # TODO or find cached package checksum from the cache folder
- for source in get_best_source(sources, sources_list=sources_list):
- url = sources[source]
-
- package_info_url = util.add_path(url, package_name + ".xipkg.info")
- status, response = util.curl(package_info_url, raw=True)
-
- if status == 200:
- info = parse_package_info(response)
- if info["CHECKSUM"] == checksum or skip_verification:
- return info
- else:
- if verbose:
- print(colors.RED
- + f"Checksum verification failed for {package_name} in {source}"
- + colors.RESET)
- if verbose:
- print(colors.RED + f"No matching hashes found" + colors.RESET)
- return {}
-
-# Does not verify the package itself, will only blindly accept the best size it can
-def query_package_size(sources, package_info, package_name, config, verbose=False):
- sources_list=config["dir"]["sources"]
- for source in get_best_source(sources, sources_list=sources_list):
- url = sources[source]
- if verbose:
- print(colors.LIGHT_BLACK + f"using source {source} at {url} for {package_name}")
-
- package_url = util.add_path(url, package_name + ".xipkg")
- size = util.query_size(package_url)
- if size > 0:
- return size
- return 0
-
-def retrieve_package(sources, package_info, package_name, config, completed=0, total_download=-1,
- verbose=False, skip_verification=False):
-
- sources_list=config["dir"]["sources"]
- cache_dir=config["dir"]["cache"]
- keychain_dir=config["dir"]["keychain"]
-
- checksum = package_info["CHECKSUM"]
-
- for source in get_best_source(sources, sources_list=sources_list):
- url = sources[source]
- if verbose:
- print(colors.LIGHT_BLACK + f"using source {source} at {url}")
- package_url = util.add_path(url, package_name + ".xipkg")
- package_dir = util.add_path(cache_dir, source)
-
- util.mkdir(package_dir)
-
- if total_download == -1:
- text = package_name + ".xipkg"
- else:
- text = "packages..."
-
- # TODO if package already downloaded maybe just use cached version
- status, package_path, size = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"),
- start=completed, total=total_download, text=text)
-
- if status == 200:
- downloaded_checksum = util.md5sum(package_path)
-
- if not skip_verification:
- if downloaded_checksum == checksum:
- sig = verify_signature(package_path, package_info,
- cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose)
- if len(sig) > 0:
- return package_path, source, sig, size
- elif verbose:
- print(colors.RED
- + f"Failed to verify signature for {package_name} in {source}"
- + colors.RESET)
- elif verbose:
- print(colors.RED
- + f"Checksum verification failed for {package_name} in {source}"
- + colors.RESET)
- else:
- return package_path, source, "none", size
- print(colors.RESET + colors.RED + f"No valid packages found for {package_name}" + colors.RESET)
- return "", "", "", 0
-
-def parse_package_info(packageinfo):
- info = {}
- lines = packageinfo.split(b"\n")
-
- index = 0
- while index < len(lines):
- line = lines[index]
- split = line.split(b"=")
- if len(split) > 1:
- if split[0] == b"SIGNATURE":
- index += 1
- digest = b"\n".join(lines[index:])
- info["SIGNATURE"] = digest
- break;
- else:
- info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8")
- index += 1
- return info
-
-def get_available_version(package_name, config, root="/"):
- repos = config["repos"]
- packages_dir = config["dir"]["packages"]
- sources = config["sources"]
- checksum, found_sources, requested_repo, size, files = find_package(package_name, repos, packages_dir, sources)
- return checksum
-
-def get_installed_version(package, config, root="/"):
-
- installed_info = util.add_path(root, config["dir"]["installed"], package, "info")
- if os.path.exists(installed_info):
- with open(installed_info) as file:
- for line in file:
- if line.startswith("CHECKSUM"):
- return line.strip().split("=")[-1]
- return None
-
-def update_needed(package, new_checksum, config, root="/"):
- version = get_installed_version(package, config, root)
- return not new_checksum == version
-
-def resolve_dependencies(package_info):
- d = [
- dep
- for dep in re.findall("[\w,-]*", package_info["DEPS"])
- if len(dep) > 0
- ]
- return d
-
-def find_all_dependencies(package_names, options, config):
- # this is all assuming that the order of deps installed doesn't matter
- failed = []
- to_check = [p for p in package_names]
- dependencies = {}
-
- while len(to_check) > 0:
- util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolving dependencies...")
- dep = to_check.pop()
-
- dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
-
- if dep_checksum is not None:
- dependencies[dep] = dep_checksum
-
- info = retrieve_package_info(
- dep_sources, dep_checksum, dep, config,
- verbose=options["v"], skip_verification=options["u"]
- )
-
- if len(info) > 0:
- [to_check.append(d) for d in resolve_dependencies(info) if not (d in dependencies or d in to_check)]
-
- else:
- if not dep in failed: failed.append(dep)
- if options["v"]:
- util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}")
- else:
- if not dep in failed: failed.append(dep)
- if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to find package {dep}")
-
- util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolved dependencies")
- print(colors.RESET)
-
- to_install = []
- to_update = []
- for dep,checksum in dependencies.items():
- if not is_installed(dep, config, options["r"]):
- to_install.append(dep)
- elif update_needed(dep, checksum, config, options["r"]):
- to_update.append(dep)
-
- # assuming that the latter packages are core dependencies
- # we can reverse the array to reflect the more important packages to install
- to_install.reverse()
- to_update.reverse()
- return to_install, to_update, failed
-
-def is_installed(package_name, config, root="/"):
- installed_dir = util.add_path(root, config["dir"]["installed"])
- if os.path.exists(installed_dir):
- files = os.listdir(installed_dir)
- return package_name in files
- return False
-
-def install_package(package_name, package_path, package_info,
- repo, source_url, key, post_install,
- config, verbose=False, root="/"):
-
- # TODO loading bar here
- files = util.extract_tar(package_path, root)
- if post_install:
- run_post_install(config, verbose=verbose, root=root)
- save_installed_info(package_name, package_info, files, repo, source_url, key, config, root=root)
- return files
-
-
-
-def save_installed_info(package_name, package_info,
- files, repo, source_url, key,
- config, root=""):
- installed_dir = util.add_path(root, config["dir"]["installed"], package_name)
- util.mkdir(installed_dir)
-
- name = package_info["NAME"]
- description = package_info["DESCRIPTION"] if "DESCRIPTION" in package_info else ""
- installed_checksum = package_info["CHECKSUM"]
- build_date = package_info["DATE"]
- version = package_info["VERSION"]
- installed_date = os.popen("date").read()
-
- package_url = util.add_path(source_url, repo, package_name + ".xipkg")
-
- info_file = util.add_path(installed_dir, "info")
- with open(info_file, "w") as file:
- file.write(f"NAME={name}\n")
- file.write(f"DESCRIPTION={description}\n")
- file.write(f"CHECKSUM={installed_checksum}\n")
- file.write(f"VERSION={version}\n")
- file.write(f"INSTALL_DATE={installed_date}")
- file.write(f"BUILD_DATE={build_date}\n")
- file.write(f"KEY={key}\n")
- file.write(f"URL={package_url}\n")
- file.write(f"REPO={repo}\n")
-
- files_file = util.add_path(installed_dir, "files")
- with open(files_file, "w") as file:
- file.write(files)
-
-
-def install_single(package, options, config, post_install=True, verbose=False, unsafe=False):
- checksum, sources, repo, size, files = find_package(package, config["repos"],
- config["dir"]["packages"], config["sources"])
-
- info = retrieve_package_info(
- sources, checksum, package, config,
- verbose=verbose, skip_verification=unsafe
- )
-
- package_path, source, key = retrieve_package(sources,
- info, package, config,
- verbose=verbose, skip_verification=unsafe)
-
- files = install_package(package, package_path, info,
- repo, sources[source], key, post_install,
- config, verbose=verbose, root=options["r"])
-
-
-def install_multiple(to_install, args, options, config, terminology=("install", "installed", "installing")):
- v = options["v"]
- unsafe = options["u"]
-
- length = 0
- total_files = 0
- infos = []
- for package in to_install:
- util.loading_bar(len(infos), len(to_install), "Preparing Download")
- checksum, sources, repo, size, filecount = find_package(package, config["repos"],
- config["dir"]["packages"], config["sources"])
-
- if checksum != None:
- info = retrieve_package_info(
- sources, checksum, package, config,
- verbose=v, skip_verification=unsafe
- )
-
- # TODO make package_size be written in package info or sync list instead
- length += int(size)
- total_files += int(filecount)
-
- infos.append(
- (package, sources, repo, info)
- )
-
- divisor, unit = util.get_unit(length)
-
- util.loading_bar(len(infos), len(to_install), "Preparing Download")
- print(colors.RESET + colors.CLEAR_LINE, end="\r")
-
- print(colors.WHITE + "Total download size: " + colors.LIGHT_WHITE + str(round(length / divisor, 2)) + unit)
-
- if options["y"] or util.ask_confirmation(colors.WHITE + "Continue?"):
- # TODO try catch over each package in each stage so that we can know if there are errors
-
- downloaded = 0
- pkg_files = []
- for package_info in infos:
- (package, sources, repo, info) = package_info
-
- if options["v"]:
- print(colors.BLACK + f"Fetching {package}")
- package_path, source, key, size = retrieve_package(sources,
- info, package, config,
- completed=downloaded, total_download=length,
- verbose=v, skip_verification=unsafe)
-
- if package_path == "":
- print(colors.RED + f"Failed to download {package}")
- else:
- downloaded += size
-
- pkg_files.append(
- (package, package_path, sources[source], key, repo, info)
- )
-
- util.loading_bar(int(length/divisor), int(length/divisor), "Downloaded packages", unit=unit)
- print(colors.RESET)
-
- extracted = 0
- for f in pkg_files:
- util.loading_bar(extracted, total_files, terminology[2].capitalize() + " files")
-
- (package, package_path, source, key, repo, info) = f
-
- files = install_package(package, package_path, info,
- repo, source, key, options["r"] == "/",
- config, verbose=v, root=options["r"])
- extracted += len(files.split("\n"))
-
- util.loading_bar(extracted, total_files, terminology[1].capitalize() + " files")
- print(colors.RESET)
- else:
- print(colors.RED + "Action cancelled by user")
-
-
-def install(args, options, config):
- if not options["l"]:
- sync(args, options, config)
-
- sources = config["sources"]
- repos = config["repos"]
-
- v = options["v"]
- unsafe = options["u"]
-
- packages_dir = config["dir"]["packages"]
-
- # have some interaction with sudo when necessary rather than always require it
- # this check may need to be done sooner?
- if util.is_root() or options["r"] != "/":
- to_install, to_update, location_failed = args, [], []
- if options["n"]:
- for dep in to_install:
- dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
- if dep_checksum is None:
- to_install.remove(dep)
- location_failed.append(dep)
-
- else:
- to_install, to_update, location_failed = find_all_dependencies(args, options, config)
-
-
- if len(location_failed) > 0:
- print(colors.RED + "Failed to locate the following packages:")
- print(end="\t")
- for d in location_failed:
- print(colors.RED if d in args else colors.LIGHT_RED, d, end="")
- print()
-
- together = []
- [together.append(p) for p in to_install]
- [together.append(p) for p in to_update]
-
- if len(together) > 0:
-
- if len(to_install) > 0:
- print(colors.BLUE + f"The following will be installed:")
- print(end="\t")
- for d in to_install:
- print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="")
- print()
- if len(to_update) > 0:
- print(colors.GREEN + f"The following will be updated:")
- print(end="\t")
- for d in to_update:
- print(colors.GREEN if d in args else colors.LIGHT_GREEN, d, end="")
- print()
-
- install_multiple(together, args, options, config)
- else:
- installed = " ".join([arg for arg in args
- if is_installed(arg, config, options["r"])])
- if len(installed) > 0:
- print(colors.CYAN + "Already installed", colors.LIGHT_CYAN + installed)
- else:
- print(colors.LIGHT_BLACK + "Nothing to do")
- else:
- print(colors.RED + "Root is required to install packages")