diff options
| author | davidovski <david@davidovski.xyz> | 2022-02-13 23:22:33 +0000 | 
|---|---|---|
| committer | davidovski <david@davidovski.xyz> | 2022-02-13 23:22:33 +0000 | 
| commit | 9f529b0e85c7b38e97d8ebb0371f7a6859f882f4 (patch) | |
| tree | 8de89a716bce066a5497b8263a00d9c007425e85 /src/verbs | |
| parent | f545930c0535293a37f5c1730d8b83264cf098b5 (diff) | |
started rewrite with xisync
Diffstat (limited to 'src/verbs')
| -rw-r--r-- | src/verbs/__init__.py | 0 | ||||
| -rw-r--r-- | src/verbs/file.py | 62 | ||||
| -rw-r--r-- | src/verbs/files.py | 32 | ||||
| -rw-r--r-- | src/verbs/info.py | 71 | ||||
| -rw-r--r-- | src/verbs/install.py | 471 | ||||
| -rw-r--r-- | src/verbs/remove.py | 63 | ||||
| -rw-r--r-- | src/verbs/search.py | 40 | ||||
| -rw-r--r-- | src/verbs/sync.py | 258 | ||||
| -rw-r--r-- | src/verbs/update.py | 25 | 
9 files changed, 0 insertions, 1022 deletions
diff --git a/src/verbs/__init__.py b/src/verbs/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/src/verbs/__init__.py +++ /dev/null diff --git a/src/verbs/file.py b/src/verbs/file.py deleted file mode 100644 index 008635f..0000000 --- a/src/verbs/file.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import colors -import util -import shutil - -import re - -from verbs.sync import sync -from verbs.search import list_repos - -# since we symlink /bin to /usr, we should make sure we are always looking for the same place -def condition_file(file_path): -    file_path = re.sub("^/bin", "/usr/bin", file_path) -    file_path = re.sub("^/sbin", "/usr/bin", file_path) -    file_path = re.sub("^/usr/sbin", "/usr/bin", file_path) -    file_path = re.sub("^/lib", "/usr/lib", file_path) -    file_path = re.sub("^/lib64", "/usr/lib", file_path) -    file_path = re.sub("^/usr/lib64", "/usr/lib", file_path) -    return file_path - -def absolute_path(file_path, root="/"): -    if file_path[0] == "/": -        return file_path -    else: -        root_path = os.path.realpath(root) -        file_path = os.path.realpath(file_path) -        # this is a bad way of doing this -        file_path = file_path.replace(root_path, "") -        return file_path - -def list_files(package_name, config, root="/"): -    file_list = util.add_path(root, config["dir"]["installed"], package_name, "files") -    if os.path.exists(file_list): -        with open(file_list, "r") as file: -            return [condition_file(line.strip()) for line in file] -    else: -        return [] - -def list_all_files(config, root="/"): -    packages = [ p.split("/")[-1] for p in list_repos(config["repos"], config["dir"]["packages"], config["dir"]["sources"])] -    file_list = {} -    for package in packages: -        file_list[package] = list_files(package, config, root=root) -    return file_list - -def file(args, options, config): -    if len(args) > 0: -        file_list = list_all_files(config, options["r"]) -        for file in args: -            file = condition_file(absolute_path(file, options["r"])) -            found = False -            for package, files in file_list.items(): -                if file in files: -                    found = True -                    print(colors.LIGHT_CYAN + file, colors.CYAN + "belongs to", colors.LIGHT_CYAN + package) -                    break -            if not found: -                print(colors.RED + "Could not determine which package owns " + colors.LIGHT_CYAN + file) - - -    else: -        print(colors.LIGHT_RED + "Nothing to do") diff --git a/src/verbs/files.py b/src/verbs/files.py deleted file mode 100644 index 33936a9..0000000 --- a/src/verbs/files.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import colors -import util -import shutil - -import re - -from verbs.sync import sync -from verbs.search import list_repos -from verbs.file import condition_file - -def list_files(package_name, config, root="/"): -    file_list = util.add_path(root, config["dir"]["installed"], package_name, "files") -    if os.path.exists(file_list): -        with open(file_list, "r") as file: -            return [condition_file(line.strip()) for line in file] -    else: -        return [] - -def list_all_files(config, root="/"): -    packages = [ p.split("/")[-1] for p in list_repos(config["repos"], config["dir"]["packages"], config["dir"]["sources"])] -    file_list = {} -    for package in packages: -        file_list[package] = list_files(package, config, root=root) -    return file_list - -def files(args, options, config): -    if len(args) > 0: -        [print(f) for f in list_files(args[0], config, options["r"])] - -    else: -        print(colors.LIGHT_RED + "Nothing to do") diff --git a/src/verbs/info.py b/src/verbs/info.py deleted file mode 100644 index 552df24..0000000 --- a/src/verbs/info.py +++ /dev/null @@ -1,71 +0,0 @@ -import os -import colors -import util -import shutil - -from verbs.install import find_package, retrieve_package_info, is_installed -from verbs.sync import sync - -def get_installed_info(package, config, options): -    installed_info = {} - -    info_file = util.add_path(options["r"], config["dir"]["installed"], package, "info") -    with open(info_file, "r") as file: -        for line in file: -            line = line.strip() -            key = line.split("=")[0] -            value = "=".join(line.split("=")[1:]) - -            installed_info[key] = value - -    return installed_info - -def package_info(package, config, options): -            checksum, sources, repo, size, files = find_package(package, config["repos"], config["dir"]["packages"], config["sources"]) - -            if not checksum is None: -                info = retrieve_package_info( -                        sources, checksum, package, config,  -                        verbose=options["v"], skip_verification=options["u"] -                        ) -                installed = is_installed(package, config, options["r"]) -                installed_info = get_installed_info(package, config, options) if installed else {} -                 -                print(colors.CYAN + f"Information for {package}:") -                print(colors.CYAN + "\tName: " + colors.LIGHT_CYAN + f"{info['NAME']}") -                print(colors.CYAN + "\tDescription: " + colors.LIGHT_CYAN + f"{info['DESCRIPTION']}") -                print(colors.CYAN + "\tRepo: " + colors.LIGHT_CYAN + f"{repo}") -                print(colors.CYAN + "\tChecksum: " + colors.LIGHT_CYAN + f"{info['CHECKSUM']}") -                print(colors.CYAN + "\tVersion Hash: " + colors.LIGHT_CYAN + f"{info['VERSION']}") -                print(colors.CYAN + "\tBuild Date: " + colors.LIGHT_CYAN + f"{info['DATE']}") -                print(colors.CYAN + "\tSource: " + colors.LIGHT_CYAN + f"{info['SOURCE']}") -                print(colors.CYAN + "\tDependencies: " + colors.LIGHT_CYAN + f"{info['DEPS']}") -                print(colors.CYAN + "\tInstalled: " + colors.LIGHT_CYAN + f"{installed}") - -                if installed: -                    print(colors.CYAN + "\t\tDate: " + colors.LIGHT_CYAN + f"{installed_info['INSTALL_DATE']}") -                    print(colors.CYAN + "\t\tChecksum: " + colors.LIGHT_CYAN + f"{installed_info['CHECKSUM']}") -                    print(colors.CYAN + "\t\tURL: " + colors.LIGHT_CYAN + f"{installed_info['URL']}") -                    print(colors.CYAN + "\t\tValidation Key: " + colors.LIGHT_CYAN + f"{installed_info['KEY']}") -            else: -                print(colors.RED + f"Package {package} could not be found") - - -def info(args, options, config): -    if not options["l"]: -        sync(args, options, config) - -    if len(args) == 0: -        installed_path = util.add_path(options["r"], config["dir"]["installed"]) -        installed = os.listdir(installed_path) -        if len(installed) > 0: -            [args.append(i) for i in installed] -        else: -            print(colors.RED + f"No packages have been specified nor installed") - -    for package in args: -        package_info(package, config, options) - - - -         diff --git a/src/verbs/install.py b/src/verbs/install.py deleted file mode 100644 index 0690da3..0000000 --- a/src/verbs/install.py +++ /dev/null @@ -1,471 +0,0 @@ -import os -import re -import util -import colors -import time -import requests -import hashlib - -from verbs.sync import sync, run_post_install - -def get_best_source(available, sources_list="/var/lib/xipkg/sources"): -    source_speeds = {} -    with open(sources_list, "r") as file: -        for line in file.readlines(): -            split = line.split(" ") -            if len(split) > 0: -                try: -                    if split[0] in available: -                        source_speeds[split[0]] = float(split[1]) -                except: -                    pass - -    return sorted(source_speeds.keys(), key=lambda k: source_speeds[k]) -         - -def find_package(query, repos, packages_dir, sources): -    for repo in repos: -        repo_dir = os.path.join(packages_dir, repo) -        files = os.listdir(repo_dir) - -        if query in files: -            requested_repo = repo -            with open(os.path.join(repo_dir, query)) as file: -                checksum = file.readline().strip().split("=")[-1] -                size = file.readline().strip().split("=")[-1] -                filecount = file.readline().strip().split("=")[-1] -                listed_sources = file.readline().strip().split("=")[-1].split() -                found_sources = { -                            source: util.add_path(url, repo)  -                            for source, url in sources.items()  -                            if source in listed_sources -                        } -                return checksum, found_sources, requested_repo, int(size)*1000, int(filecount) -    return None, [], None, 0, 0 - - -def verify_signature(package_file, package_info,  -                            cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain", -                            verbose=False): - -    checksum = package_info["CHECKSUM"] -     -    sig_cached_path = util.add_path(cache_dir, checksum + ".sig") -    with open(sig_cached_path, "wb") as file: -        file.write(package_info["SIGNATURE"]) - -    if os.path.exists(keychain_dir): -        keys = os.listdir(keychain_dir) -        for key in keys: -            key_path = util.add_path(keychain_dir, key) -             -            command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"  - -            if "OK" in os.popen(command).read(): -                return key -            elif verbose: -                print(colors.RED  -                        + f"Failed to verify signature against {key}" -                        + colors.RESET) - -    elif verbose: -        print(colors.BLACK + "There are no keys to verify with") -    return "" - - -def retrieve_package_info(sources, checksum, package_name, config, -                            verbose=False, skip_verification=False): - -    sources_list=config["dir"]["sources"] -    cache_dir=config["dir"]["cache"] -     -    # TODO we may potentially do this a few times while resolving deps, might want to cache things here -    # TODO or find cached package checksum from the cache folder -    for source in get_best_source(sources, sources_list=sources_list): -        url = sources[source] - -        package_info_url = util.add_path(url, package_name + ".xipkg.info") -        status, response = util.curl(package_info_url, raw=True) -    -        if status == 200: -            info = parse_package_info(response) -            if info["CHECKSUM"] == checksum or skip_verification: -                return info -            else: -                if verbose: -                    print(colors.RED  -                            + f"Checksum verification failed for {package_name} in {source}"  -                            + colors.RESET) -    if verbose: -        print(colors.RED + f"No matching hashes found" + colors.RESET) -    return {} - -# Does not verify the package itself, will only blindly accept the best size it can -def query_package_size(sources, package_info, package_name, config, verbose=False): -    sources_list=config["dir"]["sources"] -    for source in get_best_source(sources, sources_list=sources_list): -        url = sources[source] -        if verbose: -            print(colors.LIGHT_BLACK + f"using source {source} at {url} for {package_name}") - -        package_url = util.add_path(url, package_name + ".xipkg") -        size = util.query_size(package_url) -        if size > 0: -            return size -    return 0 - -def retrieve_package(sources, package_info, package_name, config, completed=0, total_download=-1, -                            verbose=False, skip_verification=False): - -    sources_list=config["dir"]["sources"] -    cache_dir=config["dir"]["cache"] -    keychain_dir=config["dir"]["keychain"] -     -    checksum = package_info["CHECKSUM"] - -    for source in get_best_source(sources, sources_list=sources_list): -        url = sources[source] -        if verbose: -            print(colors.LIGHT_BLACK + f"using source {source} at {url}") -        package_url = util.add_path(url, package_name + ".xipkg") -        package_dir = util.add_path(cache_dir, source) - -        util.mkdir(package_dir) - -        if total_download == -1: -            text = package_name + ".xipkg" -        else: -            text = "packages..." - -        # TODO if package already downloaded maybe just use cached version -        status, package_path, size = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), -                start=completed, total=total_download, text=text) - -        if status == 200: -            downloaded_checksum = util.md5sum(package_path) -             -            if not skip_verification: -                if downloaded_checksum == checksum: -                    sig = verify_signature(package_path, package_info,  -                            cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose) -                    if len(sig) > 0: -                        return package_path, source, sig, size -                    elif verbose: -                        print(colors.RED  -                                + f"Failed to verify signature for {package_name} in {source}"  -                                + colors.RESET) -                elif verbose: -                        print(colors.RED  -                                + f"Checksum verification failed for {package_name} in {source}"  -                                + colors.RESET) -            else: -                return package_path, source, "none", size -    print(colors.RESET + colors.RED + f"No valid packages found for {package_name}" + colors.RESET) -    return "", "", "", 0 - -def parse_package_info(packageinfo): -    info = {} -    lines = packageinfo.split(b"\n") - -    index = 0 -    while index < len(lines): -        line = lines[index] -        split = line.split(b"=") -        if len(split) > 1: -            if split[0] == b"SIGNATURE": -                index += 1 -                digest = b"\n".join(lines[index:]) -                info["SIGNATURE"] = digest -                break; -            else: -                info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8") -        index += 1 -    return info - -def get_available_version(package_name, config, root="/"): -    repos = config["repos"] -    packages_dir = config["dir"]["packages"] -    sources = config["sources"] -    checksum, found_sources, requested_repo, size, files = find_package(package_name, repos, packages_dir, sources) -    return checksum -     -def get_installed_version(package, config, root="/"): - -    installed_info = util.add_path(root, config["dir"]["installed"], package, "info") -    if os.path.exists(installed_info): -        with open(installed_info) as file: -            for line in file: -                if line.startswith("CHECKSUM"): -                    return line.strip().split("=")[-1] -    return None - -def update_needed(package, new_checksum, config, root="/"): -    version = get_installed_version(package, config, root) -    return not new_checksum == version - -def resolve_dependencies(package_info): -    d = [ -                dep  -                for dep in re.findall("[\w,-]*", package_info["DEPS"])  -                if len(dep) > 0 -            ] -    return d  - -def find_all_dependencies(package_names, options, config): -    # this is all assuming that the order of deps installed doesn't matter -    failed = [] -    to_check = [p for p in package_names] -    dependencies = {} - -    while len(to_check) > 0: -        util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolving dependencies...") -        dep = to_check.pop() -         -        dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) - -        if dep_checksum is not None: -            dependencies[dep] = dep_checksum - -            info = retrieve_package_info( -                        dep_sources, dep_checksum, dep, config, -                        verbose=options["v"], skip_verification=options["u"] -                    ) - -            if len(info) > 0: -                    [to_check.append(d) for d in resolve_dependencies(info) if not (d in dependencies or d in to_check)] - -            else: -                if not dep in failed: failed.append(dep) -                if options["v"]: -                        util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}") -        else: -            if not dep in failed: failed.append(dep) -            if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to find package {dep}") - -    util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolved dependencies") -    print(colors.RESET) - -    to_install = [] -    to_update = [] -    for dep,checksum in dependencies.items(): -        if not is_installed(dep, config, options["r"]): -            to_install.append(dep) -        elif update_needed(dep, checksum, config, options["r"]): -            to_update.append(dep) - -    # assuming that the latter packages are core dependencies -    # we can reverse the array to reflect the more important packages to install -    to_install.reverse() -    to_update.reverse() -    return to_install, to_update, failed - -def is_installed(package_name, config, root="/"): -    installed_dir = util.add_path(root, config["dir"]["installed"]) -    if os.path.exists(installed_dir): -        files = os.listdir(installed_dir) -        return package_name in files -    return False - -def install_package(package_name, package_path, package_info,  -        repo, source_url, key, post_install, -        config, verbose=False, root="/"): - -    # TODO loading bar here -    files = util.extract_tar(package_path, root) -    if post_install: -        run_post_install(config, verbose=verbose, root=root) -    save_installed_info(package_name, package_info, files, repo, source_url, key, config, root=root) -    return files - -     - -def save_installed_info(package_name, package_info, -        files, repo, source_url, key,  -        config, root=""): -    installed_dir = util.add_path(root, config["dir"]["installed"], package_name) -    util.mkdir(installed_dir) - -    name = package_info["NAME"] -    description = package_info["DESCRIPTION"] if "DESCRIPTION" in package_info else "" -    installed_checksum = package_info["CHECKSUM"] -    build_date = package_info["DATE"] -    version = package_info["VERSION"] -    installed_date = os.popen("date").read() - -    package_url = util.add_path(source_url, repo, package_name + ".xipkg") - -    info_file = util.add_path(installed_dir, "info") -    with open(info_file, "w") as file: -        file.write(f"NAME={name}\n") -        file.write(f"DESCRIPTION={description}\n") -        file.write(f"CHECKSUM={installed_checksum}\n") -        file.write(f"VERSION={version}\n") -        file.write(f"INSTALL_DATE={installed_date}") -        file.write(f"BUILD_DATE={build_date}\n") -        file.write(f"KEY={key}\n") -        file.write(f"URL={package_url}\n") -        file.write(f"REPO={repo}\n") - -    files_file = util.add_path(installed_dir, "files") -    with open(files_file, "w") as file: -        file.write(files) - - -def install_single(package, options, config, post_install=True, verbose=False, unsafe=False): -    checksum, sources, repo, size, files = find_package(package, config["repos"], -            config["dir"]["packages"], config["sources"]) - -    info = retrieve_package_info( -                sources, checksum, package, config, -                verbose=verbose, skip_verification=unsafe -            ) - -    package_path, source, key = retrieve_package(sources,  -            info, package, config,  -            verbose=verbose, skip_verification=unsafe) - -    files = install_package(package, package_path, info,  -            repo, sources[source], key, post_install, -            config, verbose=verbose, root=options["r"]) - - -def install_multiple(to_install, args, options, config, terminology=("install", "installed", "installing")): -    v = options["v"] -    unsafe = options["u"] - -    length = 0 -    total_files = 0 -    infos = [] -    for package in to_install: -        util.loading_bar(len(infos), len(to_install), "Preparing Download") -        checksum, sources, repo, size, filecount = find_package(package, config["repos"], -                config["dir"]["packages"], config["sources"]) - -        if checksum != None: -            info = retrieve_package_info( -                        sources, checksum, package, config, -                        verbose=v, skip_verification=unsafe -                ) - -            # TODO make package_size be written in package info or sync list instead -            length += int(size) -            total_files += int(filecount) - -            infos.append( -                    (package, sources, repo, info) -                    ) - -    divisor, unit = util.get_unit(length) - -    util.loading_bar(len(infos), len(to_install), "Preparing Download") -    print(colors.RESET + colors.CLEAR_LINE, end="\r") - -    print(colors.WHITE + "Total download size: " + colors.LIGHT_WHITE + str(round(length / divisor, 2)) + unit) - -    if options["y"] or util.ask_confirmation(colors.WHITE + "Continue?"): -        # TODO try catch over each package in each stage so that we can know if there are errors - -        downloaded = 0 -        pkg_files = [] -        for package_info in infos: -            (package, sources, repo, info) = package_info - -            if options["v"]: -                print(colors.BLACK + f"Fetching {package}") -            package_path, source, key, size = retrieve_package(sources,  -                    info, package, config,  -                    completed=downloaded, total_download=length, -                    verbose=v, skip_verification=unsafe) - -            if package_path == "": -                print(colors.RED + f"Failed to download {package}") -            else: -                downloaded += size - -                pkg_files.append( -                        (package, package_path, sources[source], key, repo, info) -                        ) -         -        util.loading_bar(int(length/divisor), int(length/divisor), "Downloaded packages", unit=unit) -        print(colors.RESET) - -        extracted = 0 -        for f in pkg_files: -            util.loading_bar(extracted, total_files, terminology[2].capitalize() + " files") - -            (package, package_path, source, key, repo, info) = f - -            files = install_package(package, package_path, info,  -                    repo, source, key, options["r"] == "/", -                    config, verbose=v, root=options["r"]) -            extracted += len(files.split("\n")) - -        util.loading_bar(extracted, total_files, terminology[1].capitalize() + " files") -        print(colors.RESET) -    else: -        print(colors.RED + "Action cancelled by user") - - -def install(args, options, config): -    if not options["l"]: -        sync(args, options, config) - -    sources = config["sources"] -    repos = config["repos"] - -    v = options["v"] -    unsafe = options["u"] - -    packages_dir = config["dir"]["packages"] - -    # have some interaction with sudo when necessary rather than always require it -    # this check may need to be done sooner? -    if util.is_root() or options["r"] != "/": -        to_install, to_update, location_failed = args, [], [] -        if options["n"]: -            for dep in to_install: -                dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"]) -                if dep_checksum is None: -                    to_install.remove(dep) -                    location_failed.append(dep) -     -        else: -            to_install, to_update, location_failed = find_all_dependencies(args, options, config) - - -        if len(location_failed) > 0: -            print(colors.RED + "Failed to locate the following packages:") -            print(end="\t") -            for d in location_failed: -                print(colors.RED if d in args else colors.LIGHT_RED, d, end="") -            print() - -        together = [] -        [together.append(p) for p in to_install] -        [together.append(p) for p in to_update] - -        if len(together) > 0: - -            if len(to_install) > 0: -                print(colors.BLUE + f"The following will be installed:") -                print(end="\t") -                for d in to_install: -                    print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="") -                print() -            if len(to_update) > 0: -                print(colors.GREEN + f"The following will be updated:") -                print(end="\t") -                for d in to_update: -                    print(colors.GREEN if d in args else colors.LIGHT_GREEN, d, end="") -                print() - -            install_multiple(together, args, options, config) -        else: -            installed = " ".join([arg for arg in args -                if is_installed(arg, config, options["r"])]) -            if len(installed) > 0: -                print(colors.CYAN + "Already installed", colors.LIGHT_CYAN + installed) -            else: -                print(colors.LIGHT_BLACK + "Nothing to do") -    else: -        print(colors.RED + "Root is required to install packages") diff --git a/src/verbs/remove.py b/src/verbs/remove.py deleted file mode 100644 index bc1a7e8..0000000 --- a/src/verbs/remove.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -import colors -import util -import shutil - -from verbs.sync import sync -from verbs.install import is_installed - -BAR_COLOR = colors.BLACK + colors.BG_RED -BAR_COLOR_RESET = colors.BG_BLACK + colors.RED - -def list_files(package_name, config, root="/"): -    file_list = util.add_path(root, config["dir"]["installed"], package_name, "files") -    with open(file_list, "r") as file: -        return [util.add_path(root, line.strip()) for line in file] - -def remove_package(package, options, config): -    if is_installed(package, config, options["r"]): -        files = list_files(package, config, options["r"]) -        done = 0 -        for file in files: -            util.loading_bar(done, len(files), f"Removing {package}", color=BAR_COLOR, reset=BAR_COLOR_RESET) -            if os.path.exists(file): -                os.remove(file) -                if options["v"]: -                    print(colors.GREEN + f"{file} removed")  - -                # TODO delete the file's parent dirs if they are empty -            else: -                if options["v"]: -                    print(colors.RED + f"{file} is missing: not removed!")  -            done += 1 -             -         -        installed_path = util.add_path(options["r"], config["dir"]["installed"], package) -        shutil.rmtree(installed_path) -        util.loading_bar(done, len(files), f"Removed {package}", color=BAR_COLOR, reset=BAR_COLOR_RESET) -        print() -    else: -        print(colors.LIGHT_RED + package + colors.RED + " is not installed") - -def remove(args, options, config): -    if not options["l"]: -        sync(args, options, config) - -    # potential to find all the orphaned deps or something, but that would require knowing why someone installed a package, so you dont lose packages that you want - -    uninstall = [package for package in args if is_installed(package, config, options["r"])] -    not_found = [package for package in args if not package in uninstall] - -    if len(not_found) > 0: -        print(colors.RED + ", ".join(not_found), "are" if len(not_found) > 1 else "is", "not installed!") -    if len(uninstall) > 0: -        print(colors.CLEAR_LINE + colors.RESET, end="") -        print(colors.RED + "The following packages will be removed:") -        print(end="\t") -        for d in uninstall: -            print(colors.RED , d, end="") -        print() - -        if util.ask_confirmation(colors.RED + "Continue?", no_confirm=options["y"]): -            for package in uninstall: -                remove_package(package, options, config) diff --git a/src/verbs/search.py b/src/verbs/search.py deleted file mode 100644 index 498a88e..0000000 --- a/src/verbs/search.py +++ /dev/null @@ -1,40 +0,0 @@ -import os -import sys -import colors -import util -import shutil - -from verbs.install import find_package, retrieve_package_info -from verbs.sync import sync - -def list_repos(repos, packages_dir, sources): -    return [ -            f"{repo}/{file}" for repo in repos for file in os.listdir(os.path.join(packages_dir, repo))  -            ] - -def search(args, options, config): -    if not options["l"]: -        sync(args, options, config) - -    if len(args) > 0: -        packages = list_repos(config["repos"], config["dir"]["packages"], config["sources"]) -        for package in args: - -            # TODO fuzzy searching here -            results = [p for p in packages if package.lower() in p.lower()] -     -            if len(results) > 0: -                print(colors.GREEN + f"Search results for {package}:") -                for r in results: -                    print(colors.LIGHT_GREEN + f"\t{r}") - -                print(colors.RESET, end="") -                sys.exit(0) -            else: -                print(colors.RED + f"Package {package} could not be found") -                print(colors.RESET, end="") -                sys.exit(1) -    else: -        print(colors.LIGHT_RED + "Nothing to do") - -         diff --git a/src/verbs/sync.py b/src/verbs/sync.py deleted file mode 100644 index b0210af..0000000 --- a/src/verbs/sync.py +++ /dev/null @@ -1,258 +0,0 @@ -import os -import util -import colors -import shutil -import time -import sys - -CACHE_DIR = "/var/cache/xipkg" - -def run_post_install(config, verbose=False, root="/"): -    """ Scan and run postinstall scripts  - -    Args: -        config: (dict) The xipkg config -        verbose: (bool, optional) Whether to print debug messages -        root: (str) The system root to begin searching -    """ - -    if root == "/": -        installed_dir = util.add_path(root, config["dir"]["postinstall"]) -        if os.path.exists(installed_dir): -            files = os.listdir(installed_dir) -            if len(files) > 0: -                done = 0 -                for file in files: -                    util.loading_bar(done, len(files), f"Running Postinstalls...") -                    f = util.add_path(config["dir"]["postinstall"], file) -                    command = f"sh {f}" -                    os.chdir("/") -                    os.system(command) -                    os.remove(f) -                    done += 1 -                util.loading_bar(len(files), len(files), f"Run Postinstalls") -                print(colors.RESET) - - -def list_packages(url): -    """ List all of the packages available in a repo  - -    Will return a parsed version of /packages.list and the time it took to retrieve this -                 -    Args: -        url: (str) The repository's URL - -    Returns: -        dict:  -            A dictionary listing all packages and a string of their info summary -            example: { -                "linux" : "abcdef 100 200" -            } -        int: -            The time in milliseconds for the request to complete -    """ - -    start = time.time() -    status, response = util.curl(url + "/packages.list") -    duration = (time.time() - start) * 1000 - -    if status != 200: -        return {}, -1 - -    return { -            line.split()[0].split(".")[0]: " ".join(line.split()[1:]) -            for line in response.split("\n") if len(line.split()) >  0 -            }, (duration / len(response)) if len(response) > 0 else float('inf') -         - -def sync_packages(repo, sources, verbose=False): -    """ -    Get a list of the versions available for all packages in a repo - -    Args: -        repo: (str) The name of the repo to search -        sources: (dict) a dictionary of the sources and their urls -        verbose: (bool, optional) Whether to print debug messages -    Returns: -        dict: -            Versions of each available package -    """ - -    versions = {} -    speeds = {} - -    for source,url in sources.items(): -        listed, speed = list_packages(url + repo if url[-1] == "/" else f"/{repo}") - -        if speed > 0: speeds[source] = speed - -        if verbose: -                print( -                        (colors.RED + f"No packages found in {source}/{repo}" + colors.RESET)  -                        if len(listed) == 0 else  -                        (colors.BLACK + f"{len(listed)} packages found in {source}/{repo}" + colors.RESET) -                    ) - -        for p in listed: -            if not p in versions: versions[p] = [] -            versions[p].append((listed[p], source)) - -    return versions, speeds - -def validate_package(package, versions, repo, verbose=False): -    popularity = {} -    for v in versions: -        info = v[0] -        source = v[1] -        if not info in popularity: -            popularity[info] = 0 -        popularity[info] += 1 - -    most_popular = "" -    p_count = -1 -    for p,c in popularity.items(): -        if c > p_count: -            most_popular = p -            p_count = c - -    sources = [v[1] for v in versions if v[0] == most_popular] -     -    # change the packages dict to list all the sources -    # maybe some validation here -    if len(most_popular.split()) > 2: -        info = { -                "checksum": most_popular.split()[0], -                "size": most_popular.split()[1], -                "files": most_popular.split()[2], -                "sources" : sources -                } -    else: -        info = { -                "checksum": most_popular.split()[0], -                "size": "0", -                "files": "0", -                "sources" : sources -                } -    return info - -def save_package(package, info, location): -    util.mkdir(location) -    package_file = os.path.join(location, package) -     -    exists = False -    if os.path.exists(package_file): -        with open(package_file, "r") as file: -            text = file.read() -            exists = info["checksum"] in text - -    content = "" -    with open(package_file, "w") as file: -        file.write("checksum=" + info["checksum"] + "\n") -        file.write("size=" + info["size"] + "\n") -        file.write("files=" + info["files"] + "\n") -        file.write("sources=" + " ".join([source for source in info["sources"]])) - -    return exists - - -def test_source(source, url): -    # requesting a resource may not be the best way to do this, caching etc -    start = time.time() -    code, reponse = util.curl(util.add_path(url, "index.html")) -    if code == 200: -        return int((time.time() - start) * 1000) -    else: -        return -1 - -def test_sources(sources, file_path, test_count=10): -    if test_count > 0: -        pings = {} -        checked = 0 -        for source,url in sources.items(): -            total = 0 -            for i in range(test_count): -                total += test_source(source, url) -                util.loading_bar(checked, len(sources) * test_count, f"Pinging Sources") -                checked += 1 -            if total > 0: -                pings[source] = int(total / test_count) if total > 0 else 0 - - -        sorted(pings) -         -        with open(file_path, "w") as file: -            for source,ping in pings.items(): -                file.write(f"{source} {ping}\n") - -        util.loading_bar(checked, len(sources) * test_count, f"Pinged Sources") -        print() - - -def sync(args, options, config): -    sources = config["sources"] -    repos = config["repos"] - -    v = options["v"] - -    new = 0 - -    run_post_install(config, verbose=options["v"], root=options["r"]) -     -    for repo in repos: -        if v: print(colors.LIGHT_BLACK + f"downloading package lists for {repo}...") - -        packages, speeds = sync_packages(repo, sources, verbose=v) -        if v: print(colors.LIGHT_BLACK + f"downloaded {len(packages)} packages from {len(sources)} sources") -         -        sorted(speeds) -        with open(config["dir"]["sources"], "w") as file: -            for source,ping in speeds.items(): -                file.write(f"{source} {ping}\n") -         -        repo_dir = os.path.join(config["dir"]["packages"], repo) -        if os.path.exists(repo_dir): -            shutil.rmtree(repo_dir) - -        # find the most popular hash to use -        done = 0  -        total = len(packages.items()) -        for package, versions in packages.items(): -            info = validate_package(package, versions, repo, verbose=v) -            if not save_package(package, info, repo_dir): -                new += 1 -            done += 1 -            util.loading_bar(done, total, f"Syncing {repo}") - -        util.loading_bar(total, total, f"Synced {repo}") -        print(colors.RESET) - -    # this isnt new updates for install, this is new packages -    #if new > 0: -    #    util.fill_line(f"There are {new} new updates", colors.LIGHT_GREEN) - - - -def import_key(name, url, config, verbose=False, root="/"): -    keychain_dir = util.add_path(root, config["dir"]["keychain"]) -    util.mkdir(keychain_dir) -    key_path = os.path.join(keychain_dir, name + ".pub") - -    if os.path.exists(key_path): -        print(colors.RED + f"Skipping existing key with name {name}") -    else: -        try: -            key_path = util.curl_to_file(url, key_path) -            print(colors.GREEN + f"Imported {name}.pub") -        except Exception as e: -            print(colors.RED + f"Failed to import key:", colors.RED + str(e)) - -def keyimport(args, options, config): -    if len(args) > 1: -        alias = args[0] -        url = args[1] -         -        import_key(alias, url, config, verbose=options["v"], root=options["r"]) - -    else: -        print(colors.RED + "Usage: keyimport <alias> <url>") - diff --git a/src/verbs/update.py b/src/verbs/update.py deleted file mode 100644 index 5b7a49f..0000000 --- a/src/verbs/update.py +++ /dev/null @@ -1,25 +0,0 @@ -import os -import util -import colors -import time - -from verbs.install import find_package, install -from verbs.sync import sync - -VERSION_COMPARED = "CHECKSUM" - -def get_installed_list(config, root="/"): -    installed_dir = util.add_path(root, config["dir"]["installed"]) -    if os.path.exists(installed_dir): -        files = os.listdir(installed_dir) -        return files -    return [] - - -def update(args, options, config): -    if not options["l"]: -        sync(args, options, config) - -    packages = [package for package in get_installed_list(config, options["r"]) if len(args) == 0 or package in args] -    options["l"] = True -    install(packages, options, config)  | 
