diff options
author | davidovski <david@davidovski.xyz> | 2022-02-13 23:22:33 +0000 |
---|---|---|
committer | davidovski <david@davidovski.xyz> | 2022-02-13 23:22:33 +0000 |
commit | 9f529b0e85c7b38e97d8ebb0371f7a6859f882f4 (patch) | |
tree | 8de89a716bce066a5497b8263a00d9c007425e85 /src/verbs | |
parent | f545930c0535293a37f5c1730d8b83264cf098b5 (diff) |
started rewrite with xisync
Diffstat (limited to 'src/verbs')
-rw-r--r-- | src/verbs/__init__.py | 0 | ||||
-rw-r--r-- | src/verbs/file.py | 62 | ||||
-rw-r--r-- | src/verbs/files.py | 32 | ||||
-rw-r--r-- | src/verbs/info.py | 71 | ||||
-rw-r--r-- | src/verbs/install.py | 471 | ||||
-rw-r--r-- | src/verbs/remove.py | 63 | ||||
-rw-r--r-- | src/verbs/search.py | 40 | ||||
-rw-r--r-- | src/verbs/sync.py | 258 | ||||
-rw-r--r-- | src/verbs/update.py | 25 |
9 files changed, 0 insertions, 1022 deletions
diff --git a/src/verbs/__init__.py b/src/verbs/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/src/verbs/__init__.py +++ /dev/null diff --git a/src/verbs/file.py b/src/verbs/file.py deleted file mode 100644 index 008635f..0000000 --- a/src/verbs/file.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import colors -import util -import shutil - -import re - -from verbs.sync import sync -from verbs.search import list_repos - -# since we symlink /bin to /usr, we should make sure we are always looking for the same place -def condition_file(file_path): - file_path = re.sub("^/bin", "/usr/bin", file_path) - file_path = re.sub("^/sbin", "/usr/bin", file_path) - file_path = re.sub("^/usr/sbin", "/usr/bin", file_path) - file_path = re.sub("^/lib", "/usr/lib", file_path) - file_path = re.sub("^/lib64", "/usr/lib", file_path) - file_path = re.sub("^/usr/lib64", "/usr/lib", file_path) - return file_path - -def absolute_path(file_path, root="/"): - if file_path[0] == "/": - return file_path - else: - root_path = os.path.realpath(root) - file_path = os.path.realpath(file_path) - # this is a bad way of doing this - file_path = file_path.replace(root_path, "") - return file_path - -def list_files(package_name, config, root="/"): - file_list = util.add_path(root, config["dir"]["installed"], package_name, "files") - if os.path.exists(file_list): - with open(file_list, "r") as file: - return [condition_file(line.strip()) for line in file] - else: - return [] - -def list_all_files(config, root="/"): - packages = [ p.split("/")[-1] for p in list_repos(config["repos"], config["dir"]["packages"], config["dir"]["sources"])] - file_list = {} - for package in packages: - file_list[package] = list_files(package, config, root=root) - return file_list - -def file(args, options, config): - if len(args) > 0: - file_list = list_all_files(config, options["r"]) - for file in args: - file = condition_file(absolute_path(file, options["r"])) - found = False - for package, files in file_list.items(): - if file in files: - found = True - print(colors.LIGHT_CYAN + file, colors.CYAN + "belongs to", colors.LIGHT_CYAN + package) - break - if not found: - print(colors.RED + "Could not determine which package owns " + colors.LIGHT_CYAN + file) - - - else: - print(colors.LIGHT_RED + "Nothing to do") diff --git a/src/verbs/files.py b/src/verbs/files.py deleted file mode 100644 index 33936a9..0000000 --- a/src/verbs/files.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import colors -import util -import shutil - -import re - -from verbs.sync import sync -from verbs.search import list_repos -from verbs.file import condition_file - -def list_files(package_name, config, root="/"): - file_list = util.add_path(root, config["dir"]["installed"], package_name, "files") - if os.path.exists(file_list): - with open(file_list, "r") as file: - return [condition_file(line.strip()) for line in file] - else: - return [] - -def list_all_files(config, root="/"): - packages = [ p.split("/")[-1] for p in list_repos(config["repos"], config["dir"]["packages"], config["dir"]["sources"])] - file_list = {} - for package in packages: - file_list[package] = list_files(package, config, root=root) - return file_list - -def files(args, options, config): - if len(args) > 0: - [print(f) for f in list_files(args[0], config, options["r"])] - - else: - print(colors.LIGHT_RED + "Nothing to do") diff --git a/src/verbs/info.py b/src/verbs/info.py deleted file mode 100644 index 552df24..0000000 --- a/src/verbs/info.py +++ /dev/null @@ -1,71 +0,0 @@ -import os -import colors -import util -import shutil - -from verbs.install import find_package, retrieve_package_info, is_installed -from verbs.sync import sync - -def get_installed_info(package, config, options): - installed_info = {} - - info_file = util.add_path(options["r"], config["dir"]["installed"], package, "info") - with open(info_file, "r") as file: - for line in file: - line = line.strip() - key = line.split("=")[0] - value = "=".join(line.split("=")[1:]) - - installed_info[key] = value - - return installed_info - -def package_info(package, config, options): - checksum, sources, repo, size, files = find_package(package, config["repos"], config["dir"]["packages"], config["sources"]) - - if not checksum is None: - info = retrieve_package_info( - sources, checksum, package, config, - verbose=options["v"], skip_verification=options["u"] - ) - installed = is_installed(package, config, options["r"]) - installed_info = get_installed_info(package, config, options) if installed else {} - - print(colors.CYAN + f"Information for {package}:") - print(colors.CYAN + "\tName: " + colors.LIGHT_CYAN + f"{info['NAME']}") - print(colors.CYAN + "\tDescription: " + colors.LIGHT_CYAN + f"{info['DESCRIPTION']}") - print(colors.CYAN + "\tRepo: " + colors.LIGHT_CYAN + f"{repo}") - print(colors.CYAN + "\tChecksum: " + colors.LIGHT_CYAN + f"{info['CHECKSUM']}") - print(colors.CYAN + "\tVersion Hash: " + colors.LIGHT_CYAN + f"{info['VERSION']}") - print(colors.CYAN + "\tBuild Date: " + colors.LIGHT_CYAN + f"{info['DATE']}") - print(colors.CYAN + "\tSource: " + colors.LIGHT_CYAN + f"{info['SOURCE']}") - print(colors.CYAN + "\tDependencies: " + colors.LIGHT_CYAN + f"{info['DEPS']}") - print(colors.CYAN + "\tInstalled: " + colors.LIGHT_CYAN + f"{installed}") - - if installed: - print(colors.CYAN + "\t\tDate: " + colors.LIGHT_CYAN + f"{installed_info['INSTALL_DATE']}") - print(colors.CYAN + "\t\tChecksum: " + colors.LIGHT_CYAN + f"{installed_info['CHECKSUM']}") - print(colors.CYAN + "\t\tURL: " + colors.LIGHT_CYAN + f"{installed_info['URL']}") - print(colors.CYAN + "\t\tValidation Key: " + colors.LIGHT_CYAN + f"{installed_info['KEY']}") - else: - print(colors.RED + f"Package {package} could not be found") - - -def info(args, options, config): - if not options["l"]: - sync(args, options, config) - - if len(args) == 0: - installed_path = util.add_path(options["r"], config["dir"]["installed"]) - installed = os.listdir(installed_path) - if len(installed) > 0: - [args.append(i) for i in installed] - else: - print(colors.RED + f"No packages have been specified nor installed") - - for package in args: - package_info(package, config, options) - - - - diff --git a/src/verbs/install.py b/src/verbs/install.py deleted file mode 100644 index 0690da3..0000000 --- a/src/verbs/install.py +++ /dev/null @@ -1,471 +0,0 @@ -import os -import re -import util -import colors -import time -import requests -import hashlib - -from verbs.sync import sync, run_post_install - -def get_best_source(available, sources_list="/var/lib/xipkg/sources"): - source_speeds = {} - with open(sources_list, "r") as file: - for line in file.readlines(): - split = line.split(" ") - if len(split) > 0: - try: - if split[0] in available: - source_speeds[split[0]] = float(split[1]) - except: - pass - - return sorted(source_speeds.keys(), key=lambda k: source_speeds[k]) - - -def find_package(query, repos, packages_dir, sources): - for repo in repos: - repo_dir = os.path.join(packages_dir, repo) - files = os.listdir(repo_dir) - - if query in files: - requested_repo = repo - with open(os.path.join(repo_dir, query)) as file: - checksum = file.readline().strip().split("=")[-1] - size = file.readline().strip().split("=")[-1] - filecount = file.readline().strip().split("=")[-1] - listed_sources = file.readline().strip().split("=")[-1].split() - found_sources = { - source: util.add_path(url, repo) - for source, url in sources.items() - if source in listed_sources - } - return checksum, found_sources, requested_repo, int(size)*1000, int(filecount) - return None, [], None, 0, 0 - - -def verify_signature(package_file, package_info, - cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain", - verbose=False): - - checksum = package_info["CHECKSUM"] - - sig_cached_path = util.add_path(cache_dir, checksum + ".sig") - with open(sig_cached_path, "wb") as file: - file.write(package_info["SIGNATURE"]) - - if os.path.exists(keychain_dir): - keys = os.listdir(keychain_dir) - for key in keys: - key_path = util.add_path(keychain_dir, key) - - command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}" - - if "OK" in os.popen(command).read(): - return key - elif verbose: - print(colors.RED - + f"Failed to verify signature against {key}" - + colors.RESET) - - elif verbose: - print(colors.BLACK + "There are no keys to verify with") - return "" - - -def retrieve_package_info(sources, checksum, package_name, config, - verbose=False, skip_verification=False): - - sources_list=config["dir"]["sources"] - cache_dir=config["dir"]["cache"] - - # TODO we may potentially do this a few times while resolving deps, might want to cache things here - # TODO or find cached package checksum from the cache folder - for source in get_best_source(sources, sources_list=sources_list): - url = sources[source] - - package_info_url = util.add_path(url, package_name + ".xipkg.info") - status, response = util.curl(package_info_url, raw=True) - - if status == 200: - info = parse_package_info(response) - if info["CHECKSUM"] == checksum or skip_verification: - return info - else: - if verbose: - print(colors.RED - + f"Checksum verification failed for {package_name} in {source}" - + colors.RESET) - if verbose: - print(colors.RED + f"No matching hashes found" + colors.RESET) - return {} - -# Does not verify the package itself, will only blindly accept the best size it can -def query_package_size(sources, package_info, package_name, config, verbose=False): - sources_list=config["dir"]["sources"] - for source in get_best_source(sources, sources_list=sources_list): - url = sources[source] - if verbose: - print(colors.LIGHT_BLACK + f"using source {source} at {url} for {package_name}") - - package_url = util.add_path(url, package_name + ".xipkg") - size = util.query_size(package_url) - if size > 0: - return size - return 0 - -def retrieve_package(sources, package_info, package_name, config, completed=0, total_download=-1, - verbose=False, skip_verification=False): - - sources_list=config["dir"]["sources"] - cache_dir=config["dir"]["cache"] - keychain_dir=config["dir"]["keychain"] - - checksum = package_info["CHECKSUM"] - - for source in get_best_source(sources, sources_list=sources_list): - url = sources[source] - if verbose: - print(colors.LIGHT_BLACK + f"using source {source} at {url}") - package_url = util.add_path(url, package_name + ".xipkg") - package_dir = util.add_path(cache_dir, source) - - util.mkdir(package_dir) - - if total_download == -1: - text = package_name + ".xipkg" - else: - text = "packages..." - - # TODO if package already downloaded maybe just use cached version - status, package_path, size = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), - start=completed, total=total_download, text=text) - - if status == 200: - downloaded_checksum = util.md5sum(package_path) - - if not skip_verification: - if downloaded_checksum == checksum: - sig = verify_signature(package_path, package_info, - cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose) - if len(sig) > 0: - return package_path, source, sig, size - elif verbose: - print(colors.RED - + f"Failed to verify signature for {package_name} in {source}" - + colors.RESET) - elif verbose: - print(colors.RED - + f"Checksum verification failed for {package_name} in {source}" - + colors.RESET) - else: - return package_path, source, "none", size - print(colors.RESET + colors.RED + f"No valid packages found for {package_name}" + colors.RESET) - return "", "", "", 0 - -def parse_package_info(packageinfo): - info = {} - lines = packageinfo.split(b"\n") - - index = 0 - while index < len(lines): - line = lines[index] - split = line.split(b"=") - if len(split) > 1: - if split[0] == b"SIGNATURE": - index += 1 - digest = b"\n".join(lines[index:]) - info["SIGNATURE"] = digest - break; - else: - info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8") - index += 1 - return info - -def get_available_version(package_name, config, root="/"): - repos = config["repos"] - packages_dir = config["dir"]["packages"] - sources = config["sources"] - checksum, found_sources, requested_repo, size, files = find_package(package_name, repos, packages_dir, sources) - return checksum - -def get_installed_version(package, config, root="/"): - - installed_info = util.add_path(root, config["dir"]["installed"], package, "info") - if os.path.exists(installed_info): - with open(installed_info) as file: - for line in file: - if line.startswith("CHECKSUM"): - return line.strip().split("=")[-1] - return None - -def update_needed(package, new_checksum, config, root="/"): - version = get_installed_version(package, config, root) - return not new_checksum == version - -def resolve_dependencies(package_info): - d = [ - dep - for dep in re.findall("[\w,-]*", package_info["DEPS"]) - if len(dep) > 0 - ] - return d - -def find_all_dependencies(package_names, options, config): - # this is all assuming that the order of deps installed doesn't matter - failed = [] - to_check = [p for p in package_names] - dependencies = {} - - while len(to_check) > 0: - util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolving dependencies...") - dep = to_check.pop() - - dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) - - if dep_checksum is not None: - dependencies[dep] = dep_checksum - - info = retrieve_package_info( - dep_sources, dep_checksum, dep, config, - verbose=options["v"], skip_verification=options["u"] - ) - - if len(info) > 0: - [to_check.append(d) for d in resolve_dependencies(info) if not (d in dependencies or d in to_check)] - - else: - if not dep in failed: failed.append(dep) - if options["v"]: - util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}") - else: - if not dep in failed: failed.append(dep) - if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to find package {dep}") - - util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolved dependencies") - print(colors.RESET) - - to_install = [] - to_update = [] - for dep,checksum in dependencies.items(): - if not is_installed(dep, config, options["r"]): - to_install.append(dep) - elif update_needed(dep, checksum, config, options["r"]): - to_update.append(dep) - - # assuming that the latter packages are core dependencies - # we can reverse the array to reflect the more important packages to install - to_install.reverse() - to_update.reverse() - return to_install, to_update, failed - -def is_installed(package_name, config, root="/"): - installed_dir = util.add_path(root, config["dir"]["installed"]) - if os.path.exists(installed_dir): - files = os.listdir(installed_dir) - return package_name in files - return False - -def install_package(package_name, package_path, package_info, - repo, source_url, key, post_install, - config, verbose=False, root="/"): - - # TODO loading bar here - files = util.extract_tar(package_path, root) - if post_install: - run_post_install(config, verbose=verbose, root=root) - save_installed_info(package_name, package_info, files, repo, source_url, key, config, root=root) - return files - - - -def save_installed_info(package_name, package_info, - files, repo, source_url, key, - config, root=""): - installed_dir = util.add_path(root, config["dir"]["installed"], package_name) - util.mkdir(installed_dir) - - name = package_info["NAME"] - description = package_info["DESCRIPTION"] if "DESCRIPTION" in package_info else "" - installed_checksum = package_info["CHECKSUM"] - build_date = package_info["DATE"] - version = package_info["VERSION"] - installed_date = os.popen("date").read() - - package_url = util.add_path(source_url, repo, package_name + ".xipkg") - - info_file = util.add_path(installed_dir, "info") - with open(info_file, "w") as file: - file.write(f"NAME={name}\n") - file.write(f"DESCRIPTION={description}\n") - file.write(f"CHECKSUM={installed_checksum}\n") - file.write(f"VERSION={version}\n") - file.write(f"INSTALL_DATE={installed_date}") - file.write(f"BUILD_DATE={build_date}\n") - file.write(f"KEY={key}\n") - file.write(f"URL={package_url}\n") - file.write(f"REPO={repo}\n") - - files_file = util.add_path(installed_dir, "files") - with open(files_file, "w") as file: - file.write(files) - - -def install_single(package, options, config, post_install=True, verbose=False, unsafe=False): - checksum, sources, repo, size, files = find_package(package, config["repos"], - config["dir"]["packages"], config["sources"]) - - info = retrieve_package_info( - sources, checksum, package, config, - verbose=verbose, skip_verification=unsafe - ) - - package_path, source, key = retrieve_package(sources, - info, package, config, - verbose=verbose, skip_verification=unsafe) - - files = install_package(package, package_path, info, - repo, sources[source], key, post_install, - config, verbose=verbose, root=options["r"]) - - -def install_multiple(to_install, args, options, config, terminology=("install", "installed", "installing")): - v = options["v"] - unsafe = options["u"] - - length = 0 - total_files = 0 - infos = [] - for package in to_install: - util.loading_bar(len(infos), len(to_install), "Preparing Download") - checksum, sources, repo, size, filecount = find_package(package, config["repos"], - config["dir"]["packages"], config["sources"]) - - if checksum != None: - info = retrieve_package_info( - sources, checksum, package, config, - verbose=v, skip_verification=unsafe - ) - - # TODO make package_size be written in package info or sync list instead - length += int(size) - total_files += int(filecount) - - infos.append( - (package, sources, repo, info) - ) - - divisor, unit = util.get_unit(length) - - util.loading_bar(len(infos), len(to_install), "Preparing Download") - print(colors.RESET + colors.CLEAR_LINE, end="\r") - - print(colors.WHITE + "Total download size: " + colors.LIGHT_WHITE + str(round(length / divisor, 2)) + unit) - - if options["y"] or util.ask_confirmation(colors.WHITE + "Continue?"): - # TODO try catch over each package in each stage so that we can know if there are errors - - downloaded = 0 - pkg_files = [] - for package_info in infos: - (package, sources, repo, info) = package_info - - if options["v"]: - print(colors.BLACK + f"Fetching {package}") - package_path, source, key, size = retrieve_package(sources, - info, package, config, - completed=downloaded, total_download=length, - verbose=v, skip_verification=unsafe) - - if package_path == "": - print(colors.RED + f"Failed to download {package}") - else: - downloaded += size - - pkg_files.append( - (package, package_path, sources[source], key, repo, info) - ) - - util.loading_bar(int(length/divisor), int(length/divisor), "Downloaded packages", unit=unit) - print(colors.RESET) - - extracted = 0 - for f in pkg_files: - util.loading_bar(extracted, total_files, terminology[2].capitalize() + " files") - - (package, package_path, source, key, repo, info) = f - - files = install_package(package, package_path, info, - repo, source, key, options["r"] == "/", - config, verbose=v, root=options["r"]) - extracted += len(files.split("\n")) - - util.loading_bar(extracted, total_files, terminology[1].capitalize() + " files") - print(colors.RESET) - else: - print(colors.RED + "Action cancelled by user") - - -def install(args, options, config): - if not options["l"]: - sync(args, options, config) - - sources = config["sources"] - repos = config["repos"] - - v = options["v"] - unsafe = options["u"] - - packages_dir = config["dir"]["packages"] - - # have some interaction with sudo when necessary rather than always require it - # this check may need to be done sooner? - if util.is_root() or options["r"] != "/": - to_install, to_update, location_failed = args, [], [] - if options["n"]: - for dep in to_install: - dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) - if dep_checksum is None: - to_install.remove(dep) - location_failed.append(dep) - - else: - to_install, to_update, location_failed = find_all_dependencies(args, options, config) - - - if len(location_failed) > 0: - print(colors.RED + "Failed to locate the following packages:") - print(end="\t") - for d in location_failed: - print(colors.RED if d in args else colors.LIGHT_RED, d, end="") - print() - - together = [] - [together.append(p) for p in to_install] - [together.append(p) for p in to_update] - - if len(together) > 0: - - if len(to_install) > 0: - print(colors.BLUE + f"The following will be installed:") - print(end="\t") - for d in to_install: - print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="") - print() - if len(to_update) > 0: - print(colors.GREEN + f"The following will be updated:") - print(end="\t") - for d in to_update: - print(colors.GREEN if d in args else colors.LIGHT_GREEN, d, end="") - print() - - install_multiple(together, args, options, config) - else: - installed = " ".join([arg for arg in args - if is_installed(arg, config, options["r"])]) - if len(installed) > 0: - print(colors.CYAN + "Already installed", colors.LIGHT_CYAN + installed) - else: - print(colors.LIGHT_BLACK + "Nothing to do") - else: - print(colors.RED + "Root is required to install packages") diff --git a/src/verbs/remove.py b/src/verbs/remove.py deleted file mode 100644 index bc1a7e8..0000000 --- a/src/verbs/remove.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -import colors -import util -import shutil - -from verbs.sync import sync -from verbs.install import is_installed - -BAR_COLOR = colors.BLACK + colors.BG_RED -BAR_COLOR_RESET = colors.BG_BLACK + colors.RED - -def list_files(package_name, config, root="/"): - file_list = util.add_path(root, config["dir"]["installed"], package_name, "files") - with open(file_list, "r") as file: - return [util.add_path(root, line.strip()) for line in file] - -def remove_package(package, options, config): - if is_installed(package, config, options["r"]): - files = list_files(package, config, options["r"]) - done = 0 - for file in files: - util.loading_bar(done, len(files), f"Removing {package}", color=BAR_COLOR, reset=BAR_COLOR_RESET) - if os.path.exists(file): - os.remove(file) - if options["v"]: - print(colors.GREEN + f"{file} removed") - - # TODO delete the file's parent dirs if they are empty - else: - if options["v"]: - print(colors.RED + f"{file} is missing: not removed!") - done += 1 - - - installed_path = util.add_path(options["r"], config["dir"]["installed"], package) - shutil.rmtree(installed_path) - util.loading_bar(done, len(files), f"Removed {package}", color=BAR_COLOR, reset=BAR_COLOR_RESET) - print() - else: - print(colors.LIGHT_RED + package + colors.RED + " is not installed") - -def remove(args, options, config): - if not options["l"]: - sync(args, options, config) - - # potential to find all the orphaned deps or something, but that would require knowing why someone installed a package, so you dont lose packages that you want - - uninstall = [package for package in args if is_installed(package, config, options["r"])] - not_found = [package for package in args if not package in uninstall] - - if len(not_found) > 0: - print(colors.RED + ", ".join(not_found), "are" if len(not_found) > 1 else "is", "not installed!") - if len(uninstall) > 0: - print(colors.CLEAR_LINE + colors.RESET, end="") - print(colors.RED + "The following packages will be removed:") - print(end="\t") - for d in uninstall: - print(colors.RED , d, end="") - print() - - if util.ask_confirmation(colors.RED + "Continue?", no_confirm=options["y"]): - for package in uninstall: - remove_package(package, options, config) diff --git a/src/verbs/search.py b/src/verbs/search.py deleted file mode 100644 index 498a88e..0000000 --- a/src/verbs/search.py +++ /dev/null @@ -1,40 +0,0 @@ -import os -import sys -import colors -import util -import shutil - -from verbs.install import find_package, retrieve_package_info -from verbs.sync import sync - -def list_repos(repos, packages_dir, sources): - return [ - f"{repo}/{file}" for repo in repos for file in os.listdir(os.path.join(packages_dir, repo)) - ] - -def search(args, options, config): - if not options["l"]: - sync(args, options, config) - - if len(args) > 0: - packages = list_repos(config["repos"], config["dir"]["packages"], config["sources"]) - for package in args: - - # TODO fuzzy searching here - results = [p for p in packages if package.lower() in p.lower()] - - if len(results) > 0: - print(colors.GREEN + f"Search results for {package}:") - for r in results: - print(colors.LIGHT_GREEN + f"\t{r}") - - print(colors.RESET, end="") - sys.exit(0) - else: - print(colors.RED + f"Package {package} could not be found") - print(colors.RESET, end="") - sys.exit(1) - else: - print(colors.LIGHT_RED + "Nothing to do") - - diff --git a/src/verbs/sync.py b/src/verbs/sync.py deleted file mode 100644 index b0210af..0000000 --- a/src/verbs/sync.py +++ /dev/null @@ -1,258 +0,0 @@ -import os -import util -import colors -import shutil -import time -import sys - -CACHE_DIR = "/var/cache/xipkg" - -def run_post_install(config, verbose=False, root="/"): - """ Scan and run postinstall scripts - - Args: - config: (dict) The xipkg config - verbose: (bool, optional) Whether to print debug messages - root: (str) The system root to begin searching - """ - - if root == "/": - installed_dir = util.add_path(root, config["dir"]["postinstall"]) - if os.path.exists(installed_dir): - files = os.listdir(installed_dir) - if len(files) > 0: - done = 0 - for file in files: - util.loading_bar(done, len(files), f"Running Postinstalls...") - f = util.add_path(config["dir"]["postinstall"], file) - command = f"sh {f}" - os.chdir("/") - os.system(command) - os.remove(f) - done += 1 - util.loading_bar(len(files), len(files), f"Run Postinstalls") - print(colors.RESET) - - -def list_packages(url): - """ List all of the packages available in a repo - - Will return a parsed version of /packages.list and the time it took to retrieve this - - Args: - url: (str) The repository's URL - - Returns: - dict: - A dictionary listing all packages and a string of their info summary - example: { - "linux" : "abcdef 100 200" - } - int: - The time in milliseconds for the request to complete - """ - - start = time.time() - status, response = util.curl(url + "/packages.list") - duration = (time.time() - start) * 1000 - - if status != 200: - return {}, -1 - - return { - line.split()[0].split(".")[0]: " ".join(line.split()[1:]) - for line in response.split("\n") if len(line.split()) > 0 - }, (duration / len(response)) if len(response) > 0 else float('inf') - - -def sync_packages(repo, sources, verbose=False): - """ - Get a list of the versions available for all packages in a repo - - Args: - repo: (str) The name of the repo to search - sources: (dict) a dictionary of the sources and their urls - verbose: (bool, optional) Whether to print debug messages - Returns: - dict: - Versions of each available package - """ - - versions = {} - speeds = {} - - for source,url in sources.items(): - listed, speed = list_packages(url + repo if url[-1] == "/" else f"/{repo}") - - if speed > 0: speeds[source] = speed - - if verbose: - print( - (colors.RED + f"No packages found in {source}/{repo}" + colors.RESET) - if len(listed) == 0 else - (colors.BLACK + f"{len(listed)} packages found in {source}/{repo}" + colors.RESET) - ) - - for p in listed: - if not p in versions: versions[p] = [] - versions[p].append((listed[p], source)) - - return versions, speeds - -def validate_package(package, versions, repo, verbose=False): - popularity = {} - for v in versions: - info = v[0] - source = v[1] - if not info in popularity: - popularity[info] = 0 - popularity[info] += 1 - - most_popular = "" - p_count = -1 - for p,c in popularity.items(): - if c > p_count: - most_popular = p - p_count = c - - sources = [v[1] for v in versions if v[0] == most_popular] - - # change the packages dict to list all the sources - # maybe some validation here - if len(most_popular.split()) > 2: - info = { - "checksum": most_popular.split()[0], - "size": most_popular.split()[1], - "files": most_popular.split()[2], - "sources" : sources - } - else: - info = { - "checksum": most_popular.split()[0], - "size": "0", - "files": "0", - "sources" : sources - } - return info - -def save_package(package, info, location): - util.mkdir(location) - package_file = os.path.join(location, package) - - exists = False - if os.path.exists(package_file): - with open(package_file, "r") as file: - text = file.read() - exists = info["checksum"] in text - - content = "" - with open(package_file, "w") as file: - file.write("checksum=" + info["checksum"] + "\n") - file.write("size=" + info["size"] + "\n") - file.write("files=" + info["files"] + "\n") - file.write("sources=" + " ".join([source for source in info["sources"]])) - - return exists - - -def test_source(source, url): - # requesting a resource may not be the best way to do this, caching etc - start = time.time() - code, reponse = util.curl(util.add_path(url, "index.html")) - if code == 200: - return int((time.time() - start) * 1000) - else: - return -1 - -def test_sources(sources, file_path, test_count=10): - if test_count > 0: - pings = {} - checked = 0 - for source,url in sources.items(): - total = 0 - for i in range(test_count): - total += test_source(source, url) - util.loading_bar(checked, len(sources) * test_count, f"Pinging Sources") - checked += 1 - if total > 0: - pings[source] = int(total / test_count) if total > 0 else 0 - - - sorted(pings) - - with open(file_path, "w") as file: - for source,ping in pings.items(): - file.write(f"{source} {ping}\n") - - util.loading_bar(checked, len(sources) * test_count, f"Pinged Sources") - print() - - -def sync(args, options, config): - sources = config["sources"] - repos = config["repos"] - - v = options["v"] - - new = 0 - - run_post_install(config, verbose=options["v"], root=options["r"]) - - for repo in repos: - if v: print(colors.LIGHT_BLACK + f"downloading package lists for {repo}...") - - packages, speeds = sync_packages(repo, sources, verbose=v) - if v: print(colors.LIGHT_BLACK + f"downloaded {len(packages)} packages from {len(sources)} sources") - - sorted(speeds) - with open(config["dir"]["sources"], "w") as file: - for source,ping in speeds.items(): - file.write(f"{source} {ping}\n") - - repo_dir = os.path.join(config["dir"]["packages"], repo) - if os.path.exists(repo_dir): - shutil.rmtree(repo_dir) - - # find the most popular hash to use - done = 0 - total = len(packages.items()) - for package, versions in packages.items(): - info = validate_package(package, versions, repo, verbose=v) - if not save_package(package, info, repo_dir): - new += 1 - done += 1 - util.loading_bar(done, total, f"Syncing {repo}") - - util.loading_bar(total, total, f"Synced {repo}") - print(colors.RESET) - - # this isnt new updates for install, this is new packages - #if new > 0: - # util.fill_line(f"There are {new} new updates", colors.LIGHT_GREEN) - - - -def import_key(name, url, config, verbose=False, root="/"): - keychain_dir = util.add_path(root, config["dir"]["keychain"]) - util.mkdir(keychain_dir) - key_path = os.path.join(keychain_dir, name + ".pub") - - if os.path.exists(key_path): - print(colors.RED + f"Skipping existing key with name {name}") - else: - try: - key_path = util.curl_to_file(url, key_path) - print(colors.GREEN + f"Imported {name}.pub") - except Exception as e: - print(colors.RED + f"Failed to import key:", colors.RED + str(e)) - -def keyimport(args, options, config): - if len(args) > 1: - alias = args[0] - url = args[1] - - import_key(alias, url, config, verbose=options["v"], root=options["r"]) - - else: - print(colors.RED + "Usage: keyimport <alias> <url>") - diff --git a/src/verbs/update.py b/src/verbs/update.py deleted file mode 100644 index 5b7a49f..0000000 --- a/src/verbs/update.py +++ /dev/null @@ -1,25 +0,0 @@ -import os -import util -import colors -import time - -from verbs.install import find_package, install -from verbs.sync import sync - -VERSION_COMPARED = "CHECKSUM" - -def get_installed_list(config, root="/"): - installed_dir = util.add_path(root, config["dir"]["installed"]) - if os.path.exists(installed_dir): - files = os.listdir(installed_dir) - return files - return [] - - -def update(args, options, config): - if not options["l"]: - sync(args, options, config) - - packages = [package for package in get_installed_list(config, options["r"]) if len(args) == 0 or package in args] - options["l"] = True - install(packages, options, config) |