From 9f529b0e85c7b38e97d8ebb0371f7a6859f882f4 Mon Sep 17 00:00:00 2001 From: davidovski Date: Sun, 13 Feb 2022 23:22:33 +0000 Subject: started rewrite with xisync --- src/verbs/sync.py | 258 ------------------------------------------------------ 1 file changed, 258 deletions(-) delete mode 100644 src/verbs/sync.py (limited to 'src/verbs/sync.py') diff --git a/src/verbs/sync.py b/src/verbs/sync.py deleted file mode 100644 index b0210af..0000000 --- a/src/verbs/sync.py +++ /dev/null @@ -1,258 +0,0 @@ -import os -import util -import colors -import shutil -import time -import sys - -CACHE_DIR = "/var/cache/xipkg" - -def run_post_install(config, verbose=False, root="/"): - """ Scan and run postinstall scripts - - Args: - config: (dict) The xipkg config - verbose: (bool, optional) Whether to print debug messages - root: (str) The system root to begin searching - """ - - if root == "/": - installed_dir = util.add_path(root, config["dir"]["postinstall"]) - if os.path.exists(installed_dir): - files = os.listdir(installed_dir) - if len(files) > 0: - done = 0 - for file in files: - util.loading_bar(done, len(files), f"Running Postinstalls...") - f = util.add_path(config["dir"]["postinstall"], file) - command = f"sh {f}" - os.chdir("/") - os.system(command) - os.remove(f) - done += 1 - util.loading_bar(len(files), len(files), f"Run Postinstalls") - print(colors.RESET) - - -def list_packages(url): - """ List all of the packages available in a repo - - Will return a parsed version of /packages.list and the time it took to retrieve this - - Args: - url: (str) The repository's URL - - Returns: - dict: - A dictionary listing all packages and a string of their info summary - example: { - "linux" : "abcdef 100 200" - } - int: - The time in milliseconds for the request to complete - """ - - start = time.time() - status, response = util.curl(url + "/packages.list") - duration = (time.time() - start) * 1000 - - if status != 200: - return {}, -1 - - return { - line.split()[0].split(".")[0]: " ".join(line.split()[1:]) - for line in response.split("\n") if len(line.split()) > 0 - }, (duration / len(response)) if len(response) > 0 else float('inf') - - -def sync_packages(repo, sources, verbose=False): - """ - Get a list of the versions available for all packages in a repo - - Args: - repo: (str) The name of the repo to search - sources: (dict) a dictionary of the sources and their urls - verbose: (bool, optional) Whether to print debug messages - Returns: - dict: - Versions of each available package - """ - - versions = {} - speeds = {} - - for source,url in sources.items(): - listed, speed = list_packages(url + repo if url[-1] == "/" else f"/{repo}") - - if speed > 0: speeds[source] = speed - - if verbose: - print( - (colors.RED + f"No packages found in {source}/{repo}" + colors.RESET) - if len(listed) == 0 else - (colors.BLACK + f"{len(listed)} packages found in {source}/{repo}" + colors.RESET) - ) - - for p in listed: - if not p in versions: versions[p] = [] - versions[p].append((listed[p], source)) - - return versions, speeds - -def validate_package(package, versions, repo, verbose=False): - popularity = {} - for v in versions: - info = v[0] - source = v[1] - if not info in popularity: - popularity[info] = 0 - popularity[info] += 1 - - most_popular = "" - p_count = -1 - for p,c in popularity.items(): - if c > p_count: - most_popular = p - p_count = c - - sources = [v[1] for v in versions if v[0] == most_popular] - - # change the packages dict to list all the sources - # maybe some validation here - if len(most_popular.split()) > 2: - info = { - "checksum": most_popular.split()[0], - "size": most_popular.split()[1], - "files": most_popular.split()[2], - "sources" : sources - } - else: - info = { - "checksum": most_popular.split()[0], - "size": "0", - "files": "0", - "sources" : sources - } - return info - -def save_package(package, info, location): - util.mkdir(location) - package_file = os.path.join(location, package) - - exists = False - if os.path.exists(package_file): - with open(package_file, "r") as file: - text = file.read() - exists = info["checksum"] in text - - content = "" - with open(package_file, "w") as file: - file.write("checksum=" + info["checksum"] + "\n") - file.write("size=" + info["size"] + "\n") - file.write("files=" + info["files"] + "\n") - file.write("sources=" + " ".join([source for source in info["sources"]])) - - return exists - - -def test_source(source, url): - # requesting a resource may not be the best way to do this, caching etc - start = time.time() - code, reponse = util.curl(util.add_path(url, "index.html")) - if code == 200: - return int((time.time() - start) * 1000) - else: - return -1 - -def test_sources(sources, file_path, test_count=10): - if test_count > 0: - pings = {} - checked = 0 - for source,url in sources.items(): - total = 0 - for i in range(test_count): - total += test_source(source, url) - util.loading_bar(checked, len(sources) * test_count, f"Pinging Sources") - checked += 1 - if total > 0: - pings[source] = int(total / test_count) if total > 0 else 0 - - - sorted(pings) - - with open(file_path, "w") as file: - for source,ping in pings.items(): - file.write(f"{source} {ping}\n") - - util.loading_bar(checked, len(sources) * test_count, f"Pinged Sources") - print() - - -def sync(args, options, config): - sources = config["sources"] - repos = config["repos"] - - v = options["v"] - - new = 0 - - run_post_install(config, verbose=options["v"], root=options["r"]) - - for repo in repos: - if v: print(colors.LIGHT_BLACK + f"downloading package lists for {repo}...") - - packages, speeds = sync_packages(repo, sources, verbose=v) - if v: print(colors.LIGHT_BLACK + f"downloaded {len(packages)} packages from {len(sources)} sources") - - sorted(speeds) - with open(config["dir"]["sources"], "w") as file: - for source,ping in speeds.items(): - file.write(f"{source} {ping}\n") - - repo_dir = os.path.join(config["dir"]["packages"], repo) - if os.path.exists(repo_dir): - shutil.rmtree(repo_dir) - - # find the most popular hash to use - done = 0 - total = len(packages.items()) - for package, versions in packages.items(): - info = validate_package(package, versions, repo, verbose=v) - if not save_package(package, info, repo_dir): - new += 1 - done += 1 - util.loading_bar(done, total, f"Syncing {repo}") - - util.loading_bar(total, total, f"Synced {repo}") - print(colors.RESET) - - # this isnt new updates for install, this is new packages - #if new > 0: - # util.fill_line(f"There are {new} new updates", colors.LIGHT_GREEN) - - - -def import_key(name, url, config, verbose=False, root="/"): - keychain_dir = util.add_path(root, config["dir"]["keychain"]) - util.mkdir(keychain_dir) - key_path = os.path.join(keychain_dir, name + ".pub") - - if os.path.exists(key_path): - print(colors.RED + f"Skipping existing key with name {name}") - else: - try: - key_path = util.curl_to_file(url, key_path) - print(colors.GREEN + f"Imported {name}.pub") - except Exception as e: - print(colors.RED + f"Failed to import key:", colors.RED + str(e)) - -def keyimport(args, options, config): - if len(args) > 1: - alias = args[0] - url = args[1] - - import_key(alias, url, config, verbose=options["v"], root=options["r"]) - - else: - print(colors.RED + "Usage: keyimport ") - -- cgit v1.2.1