summaryrefslogtreecommitdiff
path: root/src/verbs/install.py
diff options
context:
space:
mode:
authordavidovski <david@davidovski.xyz>2021-11-21 17:29:14 +0000
committerdavidovski <david@davidovski.xyz>2021-11-21 17:29:14 +0000
commitadb73c8b36a4145356477d3fd41cf70a3ae5fe71 (patch)
treef1c66abd4001efeb86fa69c9ea71f600cddc71c3 /src/verbs/install.py
parent05ab891879f9d3e466d6d563a545ab22f4b258a1 (diff)
added signature verification and best source selection
Diffstat (limited to 'src/verbs/install.py')
-rw-r--r--src/verbs/install.py156
1 files changed, 121 insertions, 35 deletions
diff --git a/src/verbs/install.py b/src/verbs/install.py
index 2026ab4..b5555a7 100644
--- a/src/verbs/install.py
+++ b/src/verbs/install.py
@@ -3,7 +3,23 @@ import re
import util
import colors
import time
-
+import requests
+import hashlib
+
+def get_best_source(exclude=[], sources_list="/var/lib/xipkg/sources"):
+ # TODO implement exclude
+ source_speeds = {}
+ with open(sources_list, "r") as file:
+ for line in file.readlines():
+ split = line.split(" ")
+ if len(split) > 0:
+ try:
+ source_speeds[split[0]] = float(split[1])
+ except:
+ pass
+
+ return sorted(source_speeds.keys(), key=lambda k: source_speeds[k])
+
def find_package(query, repos, packages_dir, sources):
for repo in repos:
@@ -24,14 +40,44 @@ def find_package(query, repos, packages_dir, sources):
return None, [], None
-def retrieve_package_info(sources, checksum, package_name,
+def verify_signature(package_file, package_info,
+ cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain",
+ verbose=False):
+
+ checksum = package_info["CHECKSUM"]
+
+ sig_cached_path = util.add_path(cache_dir, checksum + ".sig")
+ with open(sig_cached_path, "wb") as file:
+ file.write(package_info["SIGNATURE"])
+
+ keys = os.listdir(keychain_dir)
+ for key in keys:
+ key_path = util.add_path(keychain_dir, key)
+
+ command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"
+
+ if "OK" in os.popen(command).read():
+ return True
+ elif verbose:
+ print(colors.RED
+ + f"Failed to verify signature against {key}"
+ + colors.RESET)
+
+ return False
+
+def retrieve_package_info(sources, checksum, package_name, config,
verbose=False, skip_verification=False):
+
+ sources_list=config["dir"]["sources"]
+ cache_dir=config["dir"]["cache"]
# TODO we may potentially do this a few times while resolving deps, might want to cache things here
# TODO actually use the ping times we made earlier to decide which source to pick
- for source,url in sources.items():
+ for source in get_best_source(sources_list=sources_list):
+ url = sources[source]
+
package_info_url = util.add_path(url, package_name + ".xipkg.info")
- status, response = util.curl(package_info_url)
+ status, response = util.curl(package_info_url, raw=True)
if status == 200:
info = parse_package_info(response)
@@ -46,40 +92,69 @@ def retrieve_package_info(sources, checksum, package_name,
print(colors.RED + f"No matching hashes found" + colors.RESET)
return {}
-def retrieve_package(sources, checksum, package_name,
+def retrieve_package(sources, package_info, package_name, config,
verbose=False, skip_verification=False):
+ sources_list=config["dir"]["sources"]
+ cache_dir=config["dir"]["cache"]
+ keychain_dir=config["dir"]["keychain"]
+
# TODO actually use the ping times we made earlier to decide which source to pick
# TODO actually save tar file, and add loading bar
- for source,url in sources.items():
- package_info_url = util.add_path(url, package_name + ".xipkg")
- status, response = util.curl(package_info_url)
-
+
+ checksum = package_info["CHECKSUM"]
+
+ for source in get_best_source(sources_list=sources_list):
+ url = sources[source]
+ package_url = util.add_path(url, package_name + ".xipkg")
+ package_dir = util.add_path(cache_dir, source)
+
+ util.mkdir(package_dir)
+ status, package_path = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), text=package_name + ".xipkg")
+
if status == 200:
- downloaded_checksum = util.md5sum(response)
- print(downloaded_checksum, "compared to requested", checksum)
- if downloaded_checksum == checksum or skip_verification:
- return reponse
+ downloaded_checksum = util.md5sum(package_path)
+
+ if not skip_verification:
+ if downloaded_checksum == checksum:
+
+ if verify_signature(package_path, package_info,
+ cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose):
+ return package_path
+ elif verbose:
+ print(colors.RED
+ + f"Failed to verify signature for {package_name} in {source}"
+ + colors.RESET)
+ elif verbose:
+ print(colors.RED
+ + f"Checksum verification failed for {package_name} in {source}"
+ + colors.RESET)
else:
- if verbose:
- print(colors.RED
- + f"Checksum verification failed for {package_name} in {source}"
- + colors.RESET)
+ return package_path
if verbose:
- print(colors.RED + f"No matching hashes found" + colors.RESET)
- return {}
+ print(colors.RED + f"No valid packages found" + colors.RESET)
+ return ""
def parse_package_info(packageinfo):
info = {}
+ lines = packageinfo.split(b"\n")
- for line in packageinfo.split("\n"):
- split = line.split("=")
+ index = 0
+ while index < len(lines):
+ line = lines[index]
+ split = line.split(b"=")
if len(split) > 1:
- info[split[0]] = "=".join(split[1:])
-
+ if split[0] == b"SIGNATURE":
+ index += 1
+ digest = b"\n".join(lines[index:])
+ info["SIGNATURE"] = digest
+ break;
+ else:
+ info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8")
+ index += 1
return info
-def resolve_dependencies(package_info, config):
+def resolve_dependencies(package_info):
getpkgs = lambda deps: re.findall("\w*", deps)
deps = getpkgs(package_info["DEPS"])
@@ -101,20 +176,20 @@ def find_all_dependencies(package_names, options, config):
dep_checksum, dep_sources, dep_repo = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
if dep_checksum is not None:
info = retrieve_package_info(
- dep_sources, dep_checksum, dep,
+ dep_sources, dep_checksum, dep, config,
verbose=options["v"], skip_verification=options["u"]
)
if len(info) > 0:
- all_deps.append(dep)
- deps = resolve_dependencies(info, config)
- for dep in deps:
- if not dep in all_deps:
-
- if is_installed(dep, config):
- print(colors.YELLOW + f"Package {query} has already been installed")
- else:
- to_check.append(dep)
+ if not dep in all_deps:
+ all_deps.append(dep)
+ deps = resolve_dependencies(info)
+ for dep in deps:
+ if not dep in all_deps:
+ if is_installed(dep, config):
+ print(colors.YELLOW + f"Package {query} has already been installed")
+ else:
+ to_check.append(dep)
else:
if options["v"]:
util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {query}")
@@ -154,7 +229,18 @@ def install(args, options, config):
print()
if util.ask_confirmation(colors.BLUE + "Continue?", no_confirm=options["y"]):
- print("installed")
+
+ for package in to_install:
+ checksum, sources, repo = find_package(package, config["repos"],
+ config["dir"]["packages"], config["sources"])
+
+ info = retrieve_package_info(
+ sources, checksum, package, config,
+ verbose=v, skip_verification=unsafe
+ )
+
+ retrieve_package(sources, info, package, config,
+ verbose=v, skip_verification=unsafe)
else:
print(colors.RED + "Action cancelled by user")
else: