summaryrefslogtreecommitdiff
path: root/src/verbs/install.py
diff options
context:
space:
mode:
authordavidovski <david@davidovski.xyz>2022-01-07 22:29:50 +0000
committerdavidovski <david@davidovski.xyz>2022-01-07 22:29:50 +0000
commit7800fd6c3cc203f11f6132ec2592ab33badf6331 (patch)
tree201aeeaed6e9de530758cb022d1a07ec25a5b2d5 /src/verbs/install.py
parent954aac2b58f459b0bced68dc87608b6ba6ed45bb (diff)
grouped together loading bars and merged file sizes into the sync phase
Diffstat (limited to 'src/verbs/install.py')
-rw-r--r--src/verbs/install.py183
1 files changed, 135 insertions, 48 deletions
diff --git a/src/verbs/install.py b/src/verbs/install.py
index 60e1809..9832e2b 100644
--- a/src/verbs/install.py
+++ b/src/verbs/install.py
@@ -32,14 +32,17 @@ def find_package(query, repos, packages_dir, sources):
requested_repo = repo
with open(os.path.join(repo_dir, query)) as file:
checksum = file.readline().strip().split("=")[-1]
+ size = file.readline().strip().split("=")[-1]
+ filecount = file.readline().strip().split("=")[-1]
listed_sources = file.readline().strip().split("=")[-1].split()
found_sources = {
source: util.add_path(url, repo)
for source, url in sources.items()
if source in listed_sources
}
- return checksum, found_sources, requested_repo
- return None, [], None
+ return checksum, found_sources, requested_repo, int(size)*1000, int(filecount)
+ return None, [], None, 0, 0
+
def verify_signature(package_file, package_info,
cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain",
@@ -69,6 +72,7 @@ def verify_signature(package_file, package_info,
print(colors.BLACK + "There are no keys to verify with")
return ""
+
def retrieve_package_info(sources, checksum, package_name, config,
verbose=False, skip_verification=False):
@@ -76,7 +80,6 @@ def retrieve_package_info(sources, checksum, package_name, config,
cache_dir=config["dir"]["cache"]
# TODO we may potentially do this a few times while resolving deps, might want to cache things here
-
# TODO or find cached package checksum from the cache folder
for source in get_best_source(sources, sources_list=sources_list):
url = sources[source]
@@ -97,7 +100,21 @@ def retrieve_package_info(sources, checksum, package_name, config,
print(colors.RED + f"No matching hashes found" + colors.RESET)
return {}
-def retrieve_package(sources, package_info, package_name, config,
+# Does not verify the package itself, will only blindly accept the best size it can
+def query_package_size(sources, package_info, package_name, config, verbose=False):
+ sources_list=config["dir"]["sources"]
+ for source in get_best_source(sources, sources_list=sources_list):
+ url = sources[source]
+ if verbose:
+ print(colors.LIGHT_BLACK + f"using source {source} at {url} for {package_name}")
+
+ package_url = util.add_path(url, package_name + ".xipkg")
+ size = util.query_size(package_url)
+ if size > 0:
+ return size
+ return 0
+
+def retrieve_package(sources, package_info, package_name, config, completed=0, total_download=-1,
verbose=False, skip_verification=False):
sources_list=config["dir"]["sources"]
@@ -114,8 +131,15 @@ def retrieve_package(sources, package_info, package_name, config,
package_dir = util.add_path(cache_dir, source)
util.mkdir(package_dir)
- # TODO if exists maybe just use cached version
- status, package_path = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), text=package_name + ".xipkg")
+
+ if total_download == -1:
+ text = package_name + ".xipkg"
+ else:
+ text = "packages..."
+
+ # TODO if package already downloaded maybe just use cached version
+ status, package_path, size = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"),
+ start=completed, total=total_download, text=text)
if status == 200:
downloaded_checksum = util.md5sum(package_path)
@@ -125,8 +149,7 @@ def retrieve_package(sources, package_info, package_name, config,
sig = verify_signature(package_path, package_info,
cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose)
if len(sig) > 0:
- print(colors.RESET)
- return package_path, source, sig
+ return package_path, source, sig, size
elif verbose:
print(colors.RED
+ f"Failed to verify signature for {package_name} in {source}"
@@ -136,8 +159,7 @@ def retrieve_package(sources, package_info, package_name, config,
+ f"Checksum verification failed for {package_name} in {source}"
+ colors.RESET)
else:
- print(colors.RESET)
- return package_path, source, "none"
+ return package_path, source, "none", size
print(colors.RESET + colors.RED + f"No valid packages found for {package_name}" + colors.RESET)
return ""
@@ -171,15 +193,13 @@ def find_all_dependencies(package_names, options, config):
# this is all assuming that the order of deps installed doesn't matter
to_check = [p for p in package_names]
all_deps = []
+ failed = []
while len(to_check) > 0:
util.loading_bar(len(all_deps), len(all_deps) + len(to_check), "Resolving dependencies...")
dep = to_check.pop()
- # probably better way to implement this obligatory wildcard
- # 100% sure there is a better way of doing this than installing all packages from a repo
- # maybe some sort of package grouping (or empty package with deps on all needed)
- dep_checksum, dep_sources, dep_repo = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
+ dep_checksum, dep_sources, dep_repo, size, files = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
if dep_checksum is not None:
info = retrieve_package_info(
@@ -191,15 +211,18 @@ def find_all_dependencies(package_names, options, config):
if not dep in all_deps:
all_deps.append(dep)
deps = resolve_dependencies(info)
- for dep in deps:
- if not dep in all_deps:
- if is_installed(dep, config, options["r"]):
+ for d in deps:
+ if not d in all_deps:
+ if is_installed(d, config, options["r"]):
if options["v"]: print(colors.YELLOW + f"Package {dep} has already been installed")
else:
- to_check.append(dep)
- elif options["v"]:
- util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}")
+ to_check.append(d)
+ else:
+ if not dep in failed: failed.append(dep)
+ if options["v"]:
+ util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {dep}")
else:
+ if not dep in failed: failed.append(dep)
if options["v"]: util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to find package {dep}")
if len(all_deps) > 0:
@@ -209,7 +232,7 @@ def find_all_dependencies(package_names, options, config):
# assuming that the latter packages are core dependencies
# we can reverse the array to reflect the more important packages to install
all_deps.reverse()
- return all_deps
+ return all_deps, failed
def is_installed(package_name, config, root="/"):
installed_dir = util.add_path(root, config["dir"]["installed"])
@@ -227,6 +250,7 @@ def install_package(package_name, package_path, package_info,
if post_install:
run_post_install(config, verbose=verbose, root=root)
save_installed_info(package_name, package_info, files, repo, source_url, key, config, root=root)
+ return files
@@ -261,7 +285,20 @@ def save_installed_info(package_name, package_info,
with open(files_file, "w") as file:
file.write(files)
- pass
+
+def run_post_install(config, verbose=False, root="/"):
+ installed_dir = util.add_path(root, config["dir"]["postinstall"])
+ if os.path.exists(installed_dir):
+ files = os.listdir(installed_dir)
+ for file in files:
+ f = util.add_path(config["dir"]["postinstall"], file)
+ command = f"sh {f}"
+ if root != "/":
+ os.chroot(root)
+ os.chdir("/")
+ os.system(command)
+ os.remove(f)
+
def install_single(package, options, config, post_install=True, verbose=False, unsafe=False):
checksum, sources, repo = find_package(package, config["repos"],
@@ -280,18 +317,6 @@ def install_single(package, options, config, post_install=True, verbose=False, u
repo, sources[source], key, post_install,
config, verbose=verbose, root=options["r"])
-def run_post_install(config, verbose=False, root="/"):
- installed_dir = util.add_path(root, config["dir"]["postinstall"])
- if os.path.exists(installed_dir):
- files = os.listdir(installed_dir)
- for file in files:
- f = util.add_path(config["dir"]["postinstall"], file)
- command = f"sh {f}"
- if root != "/":
- os.chroot(root)
- os.chdir("/")
- os.system(command)
- os.remove(f)
def install(args, options, config):
if not options["l"]:
@@ -308,27 +333,89 @@ def install(args, options, config):
# have some interaction with sudo when necessary rather than always require it
# this check may need to be done sooner?
if util.is_root() or options["r"] != "/":
- to_install = args if options["n"] else find_all_dependencies(args, options, config)
+ to_install, location_failed = args, []
+ if not options["n"]:
+ to_install, location_failed = find_all_dependencies(args, options, config)
- if len(to_install) > 0:
- print(colors.CLEAR_LINE + colors.RESET, end="")
- print(colors.BLUE + "The following packages will be installed:")
+ if len(location_failed) > 0:
+ print(colors.LIGHT_RED + "Failed to locate the following packages:")
print(end="\t")
- for d in to_install:
- print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="")
+ for d in location_failed:
+ print(colors.RED if d in args else colors.LIGHT_RED, d, end="")
print()
- if util.ask_confirmation(colors.BLUE + "Continue?", no_confirm=options["y"]):
+ if len(to_install) > 0:
+ length = 0
+ total_files = 0
+ infos = []
+ for package in to_install:
+ util.loading_bar(len(infos), len(to_install), "Gathering package infos")
+ checksum, sources, repo, size, filecount = find_package(package, config["repos"],
+ config["dir"]["packages"], config["sources"])
+
+ if checksum != None:
+ info = retrieve_package_info(
+ sources, checksum, package, config,
+ verbose=v, skip_verification=unsafe
+ )
+
+ # TODO make package_size be written in package info or sync list instead
+ length += int(size)
+ total_files += int(filecount)
+
+ infos.append(
+ (package, sources, repo, info)
+ )
+
+ divisor, unit = util.get_unit(length)
+
+ util.loading_bar(len(infos), len(to_install), "Gathered package infos")
+ print(colors.RESET)
+
+ if not options["y"]:
+ print(colors.BLUE + "The following packages will be installed:")
+ print(end="\t")
+ for d in to_install:
+ print(colors.BLUE if d in args else colors.LIGHT_BLUE, d, end="")
+ print()
+
+ print(colors.BLUE + "Total download size: " + colors.LIGHT_BLUE + str(round(length / divisor, 2)) + unit)
+
+ if options["y"] or util.ask_confirmation(colors.BLUE + "Continue?"):
+ # TODO try catch over each package in each stage so that we can know if there are errors
+
+ downloaded = 0
+ pkg_files = []
+ for package_info in infos:
+ (package, sources, repo, info) = package_info
+
+ package_path, source, key, size = retrieve_package(sources,
+ info, package, config,
+ completed=downloaded, total_download=length,
+ verbose=v, skip_verification=unsafe)
+
+ downloaded += size
+
+ pkg_files.append(
+ (package, package_path, sources[source], key, repo, info)
+ )
+
+ util.loading_bar(int(length/divisor), int(length/divisor), "Downloaded packages", unit=unit)
+ print(colors.RESET)
- for package in to_install:
- try:
- install_single(package, options, config, verbose=v, unsafe=unsafe)
- util.fill_line(f"Installed {package}", colors.BG_CYAN + colors.LIGHT_BLACK, end="\n")
- except Exception as e:
- util.fill_line(f"Failed to install {package}", colors.BG_RED + colors.LIGHT_BLACK, end="\n")
- util.fill_line(str(e), colors.CLEAR_LINE + colors.RESET + colors.RED, end="\n")
+ extracted = 0
+ for f in pkg_files:
+ util.loading_bar(extracted, total_files, "Installing files")
+ (package, package_path, source, key, repo, info) = f
+ files = install_package(package, package_path, info,
+ repo, source, key, True,
+ config, verbose=v, root=options["r"])
+ extracted += len(files.split("\n"))
+
+ util.loading_bar(extracted, total_files, "Installed files")
+ print(colors.RESET)
else:
print(colors.RED + "Action cancelled by user")
else: