summaryrefslogtreecommitdiff
path: root/src/verbs
diff options
context:
space:
mode:
authordavidovski <david@davidovski.xyz>2022-02-13 23:22:33 +0000
committerdavidovski <david@davidovski.xyz>2022-02-13 23:22:33 +0000
commit9f529b0e85c7b38e97d8ebb0371f7a6859f882f4 (patch)
tree8de89a716bce066a5497b8263a00d9c007425e85 /src/verbs
parentf545930c0535293a37f5c1730d8b83264cf098b5 (diff)
started rewrite with xisync
Diffstat (limited to 'src/verbs')
-rw-r--r--src/verbs/__init__.py0
-rw-r--r--src/verbs/file.py62
-rw-r--r--src/verbs/files.py32
-rw-r--r--src/verbs/info.py71
-rw-r--r--src/verbs/install.py471
-rw-r--r--src/verbs/remove.py63
-rw-r--r--src/verbs/search.py40
-rw-r--r--src/verbs/sync.py258
-rw-r--r--src/verbs/update.py25
9 files changed, 0 insertions, 1022 deletions
diff --git a/src/verbs/__init__.py b/src/verbs/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/src/verbs/__init__.py
+++ /dev/null
diff --git a/src/verbs/file.py b/src/verbs/file.py
deleted file mode 100644
index 008635f..0000000
--- a/src/verbs/file.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import os
-import colors
-import util
-import shutil
-
-import re
-
-from verbs.sync import sync
-from verbs.search import list_repos
-
-# since we symlink /bin to /usr, we should make sure we are always looking for the same place
-def condition_file(file_path):
- file_path = re.sub("^/bin", "/usr/bin", file_path)
- file_path = re.sub("^/sbin", "/usr/bin", file_path)
- file_path = re.sub("^/usr/sbin", "/usr/bin", file_path)
- file_path = re.sub("^/lib", "/usr/lib", file_path)
- file_path = re.sub("^/lib64", "/usr/lib", file_path)
- file_path = re.sub("^/usr/lib64", "/usr/lib", file_path)
- return file_path
-
-def absolute_path(file_path, root="/"):
- if file_path[0] == "/":
- return file_path
- else:
- root_path = os.path.realpath(root)
- file_path = os.path.realpath(file_path)
- # this is a bad way of doing this
- file_path = file_path.replace(root_path, "")
- return file_path
-
-def list_files(package_name, config, root="/"):
- file_list = util.add_path(root, config["dir"]["installed"], package_name, "files")
- if os.path.exists(file_list):
- with open(file_list, "r") as file:
- return [condition_file(line.strip()) for line in file]
- else:
- return []
-
-def list_all_files(config, root="/"):
- packages = [ p.split("/")[-1] for p in list_repos(config["repos"], config["dir"]["packages"], config["dir"]["sources"])]
- file_list = {}
- for package in packages:
- file_list[package] = list_files(package, config, root=root)
- return file_list
-
-def file(args, options, config):
- if len(args) > 0:
- file_list = list_all_files(config, options["r"])
- for file in args:
- file = condition_file(absolute_path(file, options["r"]))
- found = False
- for package, files in file_list.items():
- if file in files:
- found = True
- print(colors.LIGHT_CYAN + file, colors.CYAN + "belongs to", colors.LIGHT_CYAN + package)
- break
- if not found:
- print(colors.RED + "Could not determine which package owns " + colors.LIGHT_CYAN + file)
-
-
- else:
- print(colors.LIGHT_RED + "Nothing to do")
diff --git a/src/verbs/files.py b/src/verbs/files.py
deleted file mode 100644
index 33936a9..0000000
--- a/src/verbs/files.py
+++ /dev/null
@@ -1,32 +0,0 @@
-import os
-import colors
-import util
-import shutil
-
-import re
-
-from verbs.sync import sync
-from verbs.search import list_repos
-from verbs.file import condition_file
-
-def list_files(package_name, config, root="/"):
- file_list = util.add_path(root, config["dir"]["installed"], package_name, "files")
- if os.path.exists(file_list):
- with open(file_list, "r") as file:
- return [condition_file(line.strip()) for line in file]
- else:
- return []
-
-def list_all_files(config, root="/"):
- packages = [ p.split("/")[-1] for p in list_repos(config["repos"], config["dir"]["packages"], config["dir"]["sources"])]
- file_list = {}
- for package in packages:
- file_list[package] = list_files(package, config, root=root)
- return file_list
-
-def files(args, options, config):
- if len(args) > 0:
- [print(f) for f in list_files(args[0], config, options["r"])]
-
- else:
- print(colors.LIGHT_RED + "Nothing to do")
diff --git a/src/verbs/info.py b/src/verbs/info.py
deleted file mode 100644
index 552df24..0000000
--- a/src/verbs/info.py
+++ /dev/null
@@ -1,71 +0,0 @@
-import os
-import colors
-import util
-import shutil
-
-from verbs.install import find_package, retrieve_package_info, is_installed
-from verbs.sync import sync
-
-def get_installed_info(package, config, options):
- installed_info = {}
-
- info_file = util.add_path(options["r"], config["dir"]["installed"], package, "info")
- with open(info_file, "r") as file:
- for line in file:
- line = line.strip()
- key = line.split("=")[0]
- value = "=".join(line.split("=")[1:])
-
- installed_info[key] = value
-
- return installed_info
-
-def package_info(package, config, options):
- checksum, sources, repo, size, files = find_package(package, config["repos"], config["dir"]["packages"], config["sources"])
-
- if not checksum is None:
- info = retrieve_package_info(
- sources, checksum, package, config,
- verbose=options["v"], skip_verification=options["u"]
- )
- installed = is_installed(package, config, options["r"])
- installed_info = get_installed_info(package, config, options) if installed else {}
-
- print(colors.CYAN + f"Information for {package}:")
- print(colors.CYAN + "\tName: " + colors.LIGHT_CYAN + f"{info['NAME']}")
- print(colors.CYAN + "\tDescription: " + colors.LIGHT_CYAN + f"{info['DESCRIPTION']}")
- print(colors.CYAN + "\tRepo: " + colors.LIGHT_CYAN + f"{repo}")
- print(colors.CYAN + "\tChecksum: " + colors.LIGHT_CYAN + f"{info['CHECKSUM']}")
- print(colors.CYAN + "\tVersion Hash: " + colors.LIGHT_CYAN + f"{info['VERSION']}")
- print(colors.CYAN + "\tBuild Date: " + colors.LIGHT_CYAN + f"{info['DATE']}")
- print(colors.CYAN + "\tSource: " + colors.LIGHT_CYAN + f"{info['SOURCE']}")
- print(colors.CYAN + "\tDependencies: " + colors.LIGHT_CYAN + f"{info['DEPS']}")
- print(colors.CYAN + "\tInstalled: " + colors.LIGHT_CYAN + f"{installed}")
-
- if installed:
- print(colors.CYAN + "\t\tDate: " + colors.LIGHT_CYAN + f"{installed_info['INSTALL_DATE']}")
- print(colors.CYAN + "\t\tChecksum: " + colors.LIGHT_CYAN + f"{installed_info['CHECKSUM']}")
- print(colors.CYAN + "\t\tURL: " + colors.LIGHT_CYAN + f"{installed_info['URL']}")
- print(colors.CYAN + "\t\tValidation Key: " + colors.LIGHT_CYAN + f"{installed_info['KEY']}")
- else:
- print(colors.RED + f"Package {package} could not be found")
-
-
-def info(args, options, config):
- if not options["l"]:
- sync(args, options, config)
-
- if len(args) == 0:
- installed_path = util.add_path(options["r"], config["dir"]["installed"])
- installed = os.listdir(installed_path)
- if len(installed) > 0:
- [args.append(i) for i in installed]
- else:
- print(colors.RED + f"No packages have been specified nor installed")
-
- for package in args:
- package_info(package, config, options)
-
-
-
-
diff --git a/src/verbs/install.py b/src/verbs/install.py
deleted file mode 100644
index 0690da3..0000000
--- a/src/verbs/install.py
+++ /dev/null
@@ -1,471 +0,0 @@
-import os
-import re
-import util
-import colors
-import time
-import requests
-import hashlib
-
-from verbs.sync import sync, run_post_install
-
-def get_best_source(available, sources_list="/var/lib/xipkg/sources"):
- source_speeds = {}
- with open(sources_list, "r") as file:
- for line in file.readlines():
- split = line.split(" ")
- if len(split) > 0:
- try:
- if split[0] in available:
- source_speeds[split[0]] = float(split[1])
- except:
- pass
-
- return sorted(source_speeds.keys(), key=lambda k: source_speeds[k])
-
-
-def find_package(query, repos, packages_dir, sources):
- for repo in repos:
- repo_dir = os.path.join(packages_dir, repo)
- files = os.listdir(repo_dir)
-
- if query in files:
- requested_repo = repo
- with open(os.path.join(repo_dir, query)) as file:
- checksum = file.readline().strip().split("=")[-1]
- size = file.readline().strip().split("=")[-1]
- filecount = file.readline().strip().split("=")[-1]
- listed_sources = file.readline().strip().split("=")[-1].split()
- found_sources = {
- source: util.add_path(url, repo)
- for source, url in sources.items()
- if source in listed_sources
- }
- return checksum, found_sources, requested_repo, int(size)*1000, int(filecount)
- return None, [], None, 0, 0
-
-
-def verify_signature(package_file, package_info,
- cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain",
- verbose=False):
-
- checksum = package_info["CHECKSUM"]
-
- sig_cached_path = util.add_path(cache_dir, checksum + ".sig")
- with open(sig_cached_path, "wb") as file:
- file.write(package_info["SIGNATURE"])
-
- if os.path.exists(keychain_dir):
- keys = os.listdir(keychain_dir)
- for key in keys:
- key_path = util.add_path(keychain_dir, key)
-
- command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"
-
- if "OK" in os.popen(command).read():
- return key
- elif verbose:
- print(colors.RED
- + f"Failed to verify signature against {key}"
- + colors.RESET)
-
- elif verbose:
- print(colors.BLACK + "There are no keys to verify with")
- return ""
-
-
-def retrieve_package_info(sources, checksum, package_name, config,
- verbose=False, skip_verification=False):
-
- sources_list=config["dir"]["sources"]
- cache_dir=config["dir"]["cache"]
-
- # TODO we may potentially do this a few times while resolving deps, might want to cache things here
- # TODO or find cached package checksum from the cache folder
- for source in get_best_source(sources, sources_list=sources_list):
- url = sources[source]
-
- package_info_url = util.add_path(url, package_name + ".xipkg.info")
- status, response = util.curl(package_info_url, raw=True)
-
- if status == 200:
- info = parse_package_info(response)
- if info["CHECKSUM"] == checksum or skip_verification:
- return info
- else:
- if verbose:
- print(colors.RED
- + f"Checksum verification failed for {package_name} in {source}"
- + colors.RESET)
- if verbose:
- print(colors.RED + f"No matching hashes found" + colors.RESET)
- return {}
-
-# Does not verify the package itself, will only blindly accept the best size it can
-def query_package_size(sources, package_info, package_name, config, verbose=False):
- sources_list=config["dir"]["sources"]
- for source in get_best_source(sources, sources_list=sources_list):
- url = sources[source]
- if verbose:
- print(colors.LIGHT_BLACK + f"using source {source} at {url} for {package_name}")
-
- package_url = util.add_path(url, package_name + ".xipkg")
- size = util.query_size(package_url)
- if size > 0:
- return size
- return 0
-
-def retrieve_package(sources, package_info, package_name, config, completed=0, total_download=-1,
- verbose=False, skip_verification=False):
-
- sources_list=config["dir"]["sources"]
- cache_dir=config["dir"]["cache"]
- keychain_dir=config["dir"]["keychain"]
-
- checksum = package_info["CHECKSUM"]
-
- for source in get_best_source(sources, sources_list=sources_list):
- url = sources[source]
- if verbose:
- print(colors.LIGHT_BLACK + f"using source {source} at {url}")
- package_url = util.add_path(url, package_name + ".xipkg")
- package_dir = util.add_path(cache_dir, source)
-
- util.mkdir(package_dir)
-
- if total_download == -1:
- text = package_name + ".xipkg"
- else:
- text = "packages..."
-
- # TODO if package already downloaded maybe just use cached version
- status, package_path, size = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"),
- start=completed, total=total_download, text=text)
-
- if status == 200:
- downloaded_checksum = util.md5sum(package_path)
-
- if not skip_verification:
- if downloaded_checksum == checksum:
- sig = verify_signature(package_path, package_info,
- cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose)
- if len(sig) > 0:
- return package_path, source, sig, size
- elif verbose:
- print(colors.RED
- + f"Failed to verify signature for {package_name} in {source}"
- + colors.RESET)
- elif verbose:
- print(colors.RED
- + f"Checksum verification failed for {package_name} in {source}"
- + colors.RESET)
- else:
- return package_path, source, "none", size
- print(colors.RESET + colors.RED + f"No valid packages found for {package_name}" + colors.RESET)
- return "", "", "", 0
-
-def parse_package_info(packageinfo):
- info = {}
- lines = packageinfo.split(b"\n")
-
- index = 0
- while index < len(lines):
- line = lines[index]
- split = line.split(b"=")
- if len(split) > 1:
- if split[0] == b"SIGNATURE":
- index += 1
- digest = b"\n".join(lines[index:])
- info["SIGNATURE"] = digest
- break;
- else:
- info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8")
- index += 1
- return info
-
-def get_available_version(package_name, config, root="/"):
- repos = config["repos"]
- packages_dir = config["dir"]["packages"]
- sources = config["sources"]
- checksum, found_sources, requested_repo, size, files = find_package(package_name, repos, packages_dir, sources)
- return checksum
-
-def get_installed_version(package, config, root="/"):
-
- installed_info = util.add_path(root, config["dir"]["installed"], package, "info")
- if os.path.exists(installed_info):
- with open(installed_info) as file:
- for line in file:
- if line.startswith("CHECKSUM"):
- return line.strip().split("=")[-1]
- return None
-
-def update_needed(package, new_checksum, config, root="/"):
- version = get_installed_version(package, config, root)
- return not new_checksum == version
-
-def resolve_dependencies(package_info):
- d = [
- dep
- for dep in re.findall("[\w,-]*", package_info["DEPS"])
- if len(dep) > 0
- ]
- return d
-
-def find_all_dependencies(package_names, options, config):
- # this is all assuming that the order of deps installed doesn't matter
- failed = []
- to_check = [p for p in package_names]
- dependencies = {}
-
- while len(to_check) > 0:
- util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolving dependencies...")
- dep = to_check.pop()
-
- dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
-
- if dep_checksum is not None:
- dependencies[dep] = dep_checksum
-
- info = retrieve_package_info(
- dep_sources, dep_checksum, dep, config,
- verbose=options["v"], skip_verification=options["u"]
- )
-
- if len(info) > 0:
- [to_check.append(d) for d in resolve_dependencies(info) if not (d in dependencies or d in to_check)]
-
- else:
- if not dep in failed: failed.append(dep)
- if options["v"]:
- util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}")
- else:
- if not dep in failed: failed.append(dep)
- if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to find package {dep}")
-
- util.loading_bar(len(dependencies), len(dependencies) + len(to_check), "Resolved dependencies")
- print(colors.RESET)
-
- to_install = []
- to_update = []
- for dep,checksum in dependencies.items():
- if not is_installed(dep, config, options["r"]):
- to_install.append(dep)
- elif update_needed(dep, checksum, config, options["r"]):
- to_update.append(dep)
-
- # assuming that the latter packages are core dependencies
- # we can reverse the array to reflect the more important packages to install
- to_install.reverse()
- to_update.reverse()
- return to_install, to_update, failed
-
-def is_installed(package_name, config, root="/"):
- installed_dir = util.add_path(root, config["dir"]["installed"])
- if os.path.exists(installed_dir):
- files = os.listdir(installed_dir)
- return package_name in files
- return False
-
-def install_package(package_name, package_path, package_info,
- repo, source_url, key, post_install,
- config, verbose=False, root="/"):
-
- # TODO loading bar here
- files = util.extract_tar(package_path, root)
- if post_install:
- run_post_install(config, verbose=verbose, root=root)
- save_installed_info(package_name, package_info, files, repo, source_url, key, config, root=root)
- return files
-
-
-
-def save_installed_info(package_name, package_info,
- files, repo, source_url, key,
- config, root=""):
- installed_dir = util.add_path(root, config["dir"]["installed"], package_name)
- util.mkdir(installed_dir)
-
- name = package_info["NAME"]
- description = package_info["DESCRIPTION"] if "DESCRIPTION" in package_info else ""
- installed_checksum = package_info["CHECKSUM"]
- build_date = package_info["DATE"]
- version = package_info["VERSION"]
- installed_date = os.popen("date").read()
-
- package_url = util.add_path(source_url, repo, package_name + ".xipkg")
-
- info_file = util.add_path(installed_dir, "info")
- with open(info_file, "w") as file:
- file.write(f"NAME={name}\n")
- file.write(f"DESCRIPTION={description}\n")
- file.write(f"CHECKSUM={installed_checksum}\n")
- file.write(f"VERSION={version}\n")
- file.write(f"INSTALL_DATE={installed_date}")
- file.write(f"BUILD_DATE={build_date}\n")
- file.write(f"KEY={key}\n")
- file.write(f"URL={package_url}\n")
- file.write(f"REPO={repo}\n")
-
- files_file = util.add_path(installed_dir, "files")
- with open(files_file, "w") as file:
- file.write(files)
-
-
-def install_single(package, options, config, post_install=True, verbose=False, unsafe=False):
- checksum, sources, repo, size, files = find_package(package, config["repos"],
- config["dir"]["packages"], config["sources"])
-
- info = retrieve_package_info(
- sources, checksum, package, config,
- verbose=verbose, skip_verification=unsafe
- )
-
- package_path, source, key = retrieve_package(sources,
- info, package, config,
- verbose=verbose, skip_verification=unsafe)
-
- files = install_package(package, package_path, info,
- repo, sources[source], key, post_install,
- config, verbose=verbose, root=options["r"])
-
-
-def install_multiple(to_install, args, options, config, terminology=("install", "installed", "installing")):
- v = options["v"]
- unsafe = options["u"]
-
- length = 0
- total_files = 0
- infos = []
- for package in to_install:
- util.loading_bar(len(infos), len(to_install), "Preparing Download")
- checksum, sources, repo, size, filecount = find_package(package, config["repos"],
- config["dir"]["packages"], config["sources"])
-
- if checksum != None:
- info = retrieve_package_info(
- sources, checksum, package, config,
- verbose=v, skip_verification=unsafe
- )
-
- # TODO make package_size be written in package info or sync list instead
- length += int(size)
- total_files += int(filecount)
-
- infos.append(
- (package, sources, repo, info)
- )
-
- divisor, unit = util.get_unit(length)
-
- util.loading_bar(len(infos), len(to_install), "Preparing Download")
- print(colors.RESET + colors.CLEAR_LINE, end="\r")
-
- print(colors.WHITE + "Total download size: " + colors.LIGHT_WHITE + str(round(length / divisor, 2)) + unit)
-
- if options["y"] or util.ask_confirmation(colors.WHITE + "Continue?"):
- # TODO try catch over each package in each stage so that we can know if there are errors
-
- downloaded = 0
- pkg_files = []
- for package_info in infos:
- (package, sources, repo, info) = package_info
-
- if options["v"]:
- print(colors.BLACK + f"Fetching {package}")
- package_path, source, key, size = retrieve_package(sources,
- info, package, config,
- completed=downloaded, total_download=length,
- verbose=v, skip_verification=unsafe)
-
- if package_path == "":
- print(colors.RED + f"Failed to download {package}")
- else:
- downloaded += size
-
- pkg_files.append(
- (package, package_path, sources[source], key, repo, info)
- )
-
- util.loading_bar(int(length/divisor), int(length/divisor), "Downloaded packages", unit=unit)
- print(colors.RESET)
-
- extracted = 0
- for f in pkg_files:
- util.loading_bar(extracted, total_files, terminology[2].capitalize() + " files")
-
- (package, package_path, source, key, repo, info) = f
-
- files = install_package(package, package_path, info,
- repo, source, key, options["r"] == "/",
- config, verbose=v, root=options["r"])
- extracted += len(files.split("\n"))
-
- util.loading_bar(extracted, total_files, terminology[1].capitalize() + " files")
- print(colors.RESET)
- else:
- print(colors.RED + "Action cancelled by user")
-
-
-def install(args, options, config):
- if not options["l"]:
- sync(args, options, config)
-
- sources = config["sources"]
- repos = config["repos"]
-
- v = options["v"]
- unsafe = options["u"]
-
- packages_dir = config["dir"]["packages"]
-
- # have some interaction with sudo when necessary rather than always require it
- # this check may need to be done sooner?
- if util.is_root() or options["r"] != "/":
- to_install, to_update, location_failed = args, [], []
- if options["n"]:
- for dep in to_install:
- dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
- if dep_checksum is None:
- to_install.remove(dep)
- location_failed.append(dep)
-
- else:
- to_install, to_update, location_failed = find_all_dependencies(args, options, config)
-
-
- if len(location_failed) > 0:
- print(colors.RED + "Failed to locate the following packages:")
- print(end="\t")
- for d in location_failed:
- print(colors.RED if d in args else colors.LIGHT_RED, d, end="")
- print()
-
- together = []
- [together.append(p) for p in to_install]
- [together.append(p) for p in to_update]
-
- if len(together) > 0:
-
- if len(to_install) > 0:
- print(colors.BLUE + f"The following will be installed:")
- print(end="\t")
- for d in to_install:
- print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="")
- print()
- if len(to_update) > 0:
- print(colors.GREEN + f"The following will be updated:")
- print(end="\t")
- for d in to_update:
- print(colors.GREEN if d in args else colors.LIGHT_GREEN, d, end="")
- print()
-
- install_multiple(together, args, options, config)
- else:
- installed = " ".join([arg for arg in args
- if is_installed(arg, config, options["r"])])
- if len(installed) > 0:
- print(colors.CYAN + "Already installed", colors.LIGHT_CYAN + installed)
- else:
- print(colors.LIGHT_BLACK + "Nothing to do")
- else:
- print(colors.RED + "Root is required to install packages")
diff --git a/src/verbs/remove.py b/src/verbs/remove.py
deleted file mode 100644
index bc1a7e8..0000000
--- a/src/verbs/remove.py
+++ /dev/null
@@ -1,63 +0,0 @@
-import os
-import colors
-import util
-import shutil
-
-from verbs.sync import sync
-from verbs.install import is_installed
-
-BAR_COLOR = colors.BLACK + colors.BG_RED
-BAR_COLOR_RESET = colors.BG_BLACK + colors.RED
-
-def list_files(package_name, config, root="/"):
- file_list = util.add_path(root, config["dir"]["installed"], package_name, "files")
- with open(file_list, "r") as file:
- return [util.add_path(root, line.strip()) for line in file]
-
-def remove_package(package, options, config):
- if is_installed(package, config, options["r"]):
- files = list_files(package, config, options["r"])
- done = 0
- for file in files:
- util.loading_bar(done, len(files), f"Removing {package}", color=BAR_COLOR, reset=BAR_COLOR_RESET)
- if os.path.exists(file):
- os.remove(file)
- if options["v"]:
- print(colors.GREEN + f"{file} removed")
-
- # TODO delete the file's parent dirs if they are empty
- else:
- if options["v"]:
- print(colors.RED + f"{file} is missing: not removed!")
- done += 1
-
-
- installed_path = util.add_path(options["r"], config["dir"]["installed"], package)
- shutil.rmtree(installed_path)
- util.loading_bar(done, len(files), f"Removed {package}", color=BAR_COLOR, reset=BAR_COLOR_RESET)
- print()
- else:
- print(colors.LIGHT_RED + package + colors.RED + " is not installed")
-
-def remove(args, options, config):
- if not options["l"]:
- sync(args, options, config)
-
- # potential to find all the orphaned deps or something, but that would require knowing why someone installed a package, so you dont lose packages that you want
-
- uninstall = [package for package in args if is_installed(package, config, options["r"])]
- not_found = [package for package in args if not package in uninstall]
-
- if len(not_found) > 0:
- print(colors.RED + ", ".join(not_found), "are" if len(not_found) > 1 else "is", "not installed!")
- if len(uninstall) > 0:
- print(colors.CLEAR_LINE + colors.RESET, end="")
- print(colors.RED + "The following packages will be removed:")
- print(end="\t")
- for d in uninstall:
- print(colors.RED , d, end="")
- print()
-
- if util.ask_confirmation(colors.RED + "Continue?", no_confirm=options["y"]):
- for package in uninstall:
- remove_package(package, options, config)
diff --git a/src/verbs/search.py b/src/verbs/search.py
deleted file mode 100644
index 498a88e..0000000
--- a/src/verbs/search.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import os
-import sys
-import colors
-import util
-import shutil
-
-from verbs.install import find_package, retrieve_package_info
-from verbs.sync import sync
-
-def list_repos(repos, packages_dir, sources):
- return [
- f"{repo}/{file}" for repo in repos for file in os.listdir(os.path.join(packages_dir, repo))
- ]
-
-def search(args, options, config):
- if not options["l"]:
- sync(args, options, config)
-
- if len(args) > 0:
- packages = list_repos(config["repos"], config["dir"]["packages"], config["sources"])
- for package in args:
-
- # TODO fuzzy searching here
- results = [p for p in packages if package.lower() in p.lower()]
-
- if len(results) > 0:
- print(colors.GREEN + f"Search results for {package}:")
- for r in results:
- print(colors.LIGHT_GREEN + f"\t{r}")
-
- print(colors.RESET, end="")
- sys.exit(0)
- else:
- print(colors.RED + f"Package {package} could not be found")
- print(colors.RESET, end="")
- sys.exit(1)
- else:
- print(colors.LIGHT_RED + "Nothing to do")
-
-
diff --git a/src/verbs/sync.py b/src/verbs/sync.py
deleted file mode 100644
index b0210af..0000000
--- a/src/verbs/sync.py
+++ /dev/null
@@ -1,258 +0,0 @@
-import os
-import util
-import colors
-import shutil
-import time
-import sys
-
-CACHE_DIR = "/var/cache/xipkg"
-
-def run_post_install(config, verbose=False, root="/"):
- """ Scan and run postinstall scripts
-
- Args:
- config: (dict) The xipkg config
- verbose: (bool, optional) Whether to print debug messages
- root: (str) The system root to begin searching
- """
-
- if root == "/":
- installed_dir = util.add_path(root, config["dir"]["postinstall"])
- if os.path.exists(installed_dir):
- files = os.listdir(installed_dir)
- if len(files) > 0:
- done = 0
- for file in files:
- util.loading_bar(done, len(files), f"Running Postinstalls...")
- f = util.add_path(config["dir"]["postinstall"], file)
- command = f"sh {f}"
- os.chdir("/")
- os.system(command)
- os.remove(f)
- done += 1
- util.loading_bar(len(files), len(files), f"Run Postinstalls")
- print(colors.RESET)
-
-
-def list_packages(url):
- """ List all of the packages available in a repo
-
- Will return a parsed version of /packages.list and the time it took to retrieve this
-
- Args:
- url: (str) The repository's URL
-
- Returns:
- dict:
- A dictionary listing all packages and a string of their info summary
- example: {
- "linux" : "abcdef 100 200"
- }
- int:
- The time in milliseconds for the request to complete
- """
-
- start = time.time()
- status, response = util.curl(url + "/packages.list")
- duration = (time.time() - start) * 1000
-
- if status != 200:
- return {}, -1
-
- return {
- line.split()[0].split(".")[0]: " ".join(line.split()[1:])
- for line in response.split("\n") if len(line.split()) > 0
- }, (duration / len(response)) if len(response) > 0 else float('inf')
-
-
-def sync_packages(repo, sources, verbose=False):
- """
- Get a list of the versions available for all packages in a repo
-
- Args:
- repo: (str) The name of the repo to search
- sources: (dict) a dictionary of the sources and their urls
- verbose: (bool, optional) Whether to print debug messages
- Returns:
- dict:
- Versions of each available package
- """
-
- versions = {}
- speeds = {}
-
- for source,url in sources.items():
- listed, speed = list_packages(url + repo if url[-1] == "/" else f"/{repo}")
-
- if speed > 0: speeds[source] = speed
-
- if verbose:
- print(
- (colors.RED + f"No packages found in {source}/{repo}" + colors.RESET)
- if len(listed) == 0 else
- (colors.BLACK + f"{len(listed)} packages found in {source}/{repo}" + colors.RESET)
- )
-
- for p in listed:
- if not p in versions: versions[p] = []
- versions[p].append((listed[p], source))
-
- return versions, speeds
-
-def validate_package(package, versions, repo, verbose=False):
- popularity = {}
- for v in versions:
- info = v[0]
- source = v[1]
- if not info in popularity:
- popularity[info] = 0
- popularity[info] += 1
-
- most_popular = ""
- p_count = -1
- for p,c in popularity.items():
- if c > p_count:
- most_popular = p
- p_count = c
-
- sources = [v[1] for v in versions if v[0] == most_popular]
-
- # change the packages dict to list all the sources
- # maybe some validation here
- if len(most_popular.split()) > 2:
- info = {
- "checksum": most_popular.split()[0],
- "size": most_popular.split()[1],
- "files": most_popular.split()[2],
- "sources" : sources
- }
- else:
- info = {
- "checksum": most_popular.split()[0],
- "size": "0",
- "files": "0",
- "sources" : sources
- }
- return info
-
-def save_package(package, info, location):
- util.mkdir(location)
- package_file = os.path.join(location, package)
-
- exists = False
- if os.path.exists(package_file):
- with open(package_file, "r") as file:
- text = file.read()
- exists = info["checksum"] in text
-
- content = ""
- with open(package_file, "w") as file:
- file.write("checksum=" + info["checksum"] + "\n")
- file.write("size=" + info["size"] + "\n")
- file.write("files=" + info["files"] + "\n")
- file.write("sources=" + " ".join([source for source in info["sources"]]))
-
- return exists
-
-
-def test_source(source, url):
- # requesting a resource may not be the best way to do this, caching etc
- start = time.time()
- code, reponse = util.curl(util.add_path(url, "index.html"))
- if code == 200:
- return int((time.time() - start) * 1000)
- else:
- return -1
-
-def test_sources(sources, file_path, test_count=10):
- if test_count > 0:
- pings = {}
- checked = 0
- for source,url in sources.items():
- total = 0
- for i in range(test_count):
- total += test_source(source, url)
- util.loading_bar(checked, len(sources) * test_count, f"Pinging Sources")
- checked += 1
- if total > 0:
- pings[source] = int(total / test_count) if total > 0 else 0
-
-
- sorted(pings)
-
- with open(file_path, "w") as file:
- for source,ping in pings.items():
- file.write(f"{source} {ping}\n")
-
- util.loading_bar(checked, len(sources) * test_count, f"Pinged Sources")
- print()
-
-
-def sync(args, options, config):
- sources = config["sources"]
- repos = config["repos"]
-
- v = options["v"]
-
- new = 0
-
- run_post_install(config, verbose=options["v"], root=options["r"])
-
- for repo in repos:
- if v: print(colors.LIGHT_BLACK + f"downloading package lists for {repo}...")
-
- packages, speeds = sync_packages(repo, sources, verbose=v)
- if v: print(colors.LIGHT_BLACK + f"downloaded {len(packages)} packages from {len(sources)} sources")
-
- sorted(speeds)
- with open(config["dir"]["sources"], "w") as file:
- for source,ping in speeds.items():
- file.write(f"{source} {ping}\n")
-
- repo_dir = os.path.join(config["dir"]["packages"], repo)
- if os.path.exists(repo_dir):
- shutil.rmtree(repo_dir)
-
- # find the most popular hash to use
- done = 0
- total = len(packages.items())
- for package, versions in packages.items():
- info = validate_package(package, versions, repo, verbose=v)
- if not save_package(package, info, repo_dir):
- new += 1
- done += 1
- util.loading_bar(done, total, f"Syncing {repo}")
-
- util.loading_bar(total, total, f"Synced {repo}")
- print(colors.RESET)
-
- # this isnt new updates for install, this is new packages
- #if new > 0:
- # util.fill_line(f"There are {new} new updates", colors.LIGHT_GREEN)
-
-
-
-def import_key(name, url, config, verbose=False, root="/"):
- keychain_dir = util.add_path(root, config["dir"]["keychain"])
- util.mkdir(keychain_dir)
- key_path = os.path.join(keychain_dir, name + ".pub")
-
- if os.path.exists(key_path):
- print(colors.RED + f"Skipping existing key with name {name}")
- else:
- try:
- key_path = util.curl_to_file(url, key_path)
- print(colors.GREEN + f"Imported {name}.pub")
- except Exception as e:
- print(colors.RED + f"Failed to import key:", colors.RED + str(e))
-
-def keyimport(args, options, config):
- if len(args) > 1:
- alias = args[0]
- url = args[1]
-
- import_key(alias, url, config, verbose=options["v"], root=options["r"])
-
- else:
- print(colors.RED + "Usage: keyimport <alias> <url>")
-
diff --git a/src/verbs/update.py b/src/verbs/update.py
deleted file mode 100644
index 5b7a49f..0000000
--- a/src/verbs/update.py
+++ /dev/null
@@ -1,25 +0,0 @@
-import os
-import util
-import colors
-import time
-
-from verbs.install import find_package, install
-from verbs.sync import sync
-
-VERSION_COMPARED = "CHECKSUM"
-
-def get_installed_list(config, root="/"):
- installed_dir = util.add_path(root, config["dir"]["installed"])
- if os.path.exists(installed_dir):
- files = os.listdir(installed_dir)
- return files
- return []
-
-
-def update(args, options, config):
- if not options["l"]:
- sync(args, options, config)
-
- packages = [package for package in get_installed_list(config, options["r"]) if len(args) == 0 or package in args]
- options["l"] = True
- install(packages, options, config)