summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/options.py31
-rw-r--r--src/util.py53
-rw-r--r--src/verbs/install.py156
-rw-r--r--src/xi.py2
4 files changed, 187 insertions, 55 deletions
diff --git a/src/options.py b/src/options.py
index 499f9e7..011c039 100644
--- a/src/options.py
+++ b/src/options.py
@@ -61,27 +61,32 @@ def parse_args():
arg = args[index]
if len(arg) > 1 and arg[0] == "-":
- option = None
+ option = []
# is a named argument with a --
if arg[1] == "-" and len(arg) > 2 and arg[2:].split("=")[0] in names:
- option = names[arg[2:].split("=")[0]]
+ option.appen(names[arg[2:].split("=")[0]])
# is a single letter argument with a -
- elif arg[1] in options:
- option = arg[1]
else:
- parsed["args"].append(arg)
+ for letter in arg[1:]:
+ if letter in options:
+ option.append(letter)
+
+ if len(option) == 0:
+ parsed["args"].append(arg)
+
# add the option and any values ot the parsed dict
- if option is not None:
- if options[option]["flag"]:
- parsed[option] = True
- else:
- if "=" in arg:
- parsed[option] = arg.split("=")[1]
+ for opt in option:
+ if opt is not None:
+ if options[opt]["flag"]:
+ parsed[opt] = True
else:
- index += 1
- parsed[option] = args[index]
+ if "=" in arg:
+ parsed[opt] = arg.split("=")[1]
+ else:
+ index += 1
+ parsed[opt] = args[index]
else:
parsed["args"].append(arg)
diff --git a/src/util.py b/src/util.py
index 99bb82e..cef5529 100644
--- a/src/util.py
+++ b/src/util.py
@@ -8,8 +8,11 @@ import hashlib
DEFAULT_BAR_COLOR = colors.BLACK + colors.BG_CYAN
DEFAULT_BAR_COLOR_RESET = colors.BG_BLACK + colors.CYAN
-def add_path(a, b):
- return a + (b if a[-1] == "/" else f"/{b}")
+def add_path(*argv):
+ a = argv[0]
+ for b in argv[1:]:
+ a = a + (b if a[-1] == "/" else f"/{b}")
+ return a
def loading_bar(completed, total, text,
unit="", color=DEFAULT_BAR_COLOR, reset=DEFAULT_BAR_COLOR_RESET):
@@ -29,19 +32,57 @@ def loading_bar(completed, total, text,
def print_reset(text):
print(colors.RESET + text)
-def curl(url):
+def curl(url, raw=False):
try:
r = requests.get(url)
except:
return 500, ""
- return r.status_code, r.text
+ return r.status_code, r.content if raw else r.text
+
+def get_unit(n):
+ base = 1000
+ if n > base**4: return base**4, "TB"
+ elif n > base**3: return base**3, "GB"
+ elif n > base**2: return base**2, "MB"
+ elif n > base**1: return base**1, "KB"
+
+def curl_to_file(url, path, text=""):
+ with requests.get(url, stream=True) as r:
+ r.raise_for_status()
+ length = int(r.headers['content-length'])
+ with open(path, "wb") as f:
+
+ c_size = 4096
+ ic = r.iter_content(chunk_size=c_size)
+ done = 0
+
+ for chunk in ic:
+ if text:
+ divisor, unit = get_unit(length)
+ loading_bar(int(done/divisor), int(length/divisor), "Downloading " + text, unit=unit)
+
+ f.write(chunk)
+ done += c_size
+ if text:
+ divisor, unit = get_unit(length)
+ loading_bar(int(done/divisor), int(length/divisor), "Downloaded " + text, unit=unit)
+ print(colors.RESET)
+
+ return r.status_code, path
+
def mkdir(path):
if not os.path.exists(path):
os.makedirs(path)
-def md5sum(data):
- return hashlib.md5(data)
+def md5sum(filename):
+ md5_hash = hashlib.md5()
+
+ with open(filename,"rb") as f:
+ for byte_block in iter(lambda: f.read(4096),b""):
+ md5_hash.update(byte_block)
+
+ return md5_hash.hexdigest()
def ask_confirmation(text, default=True, no_confirm=False):
yes = "Y" if default else "y"
diff --git a/src/verbs/install.py b/src/verbs/install.py
index 2026ab4..b5555a7 100644
--- a/src/verbs/install.py
+++ b/src/verbs/install.py
@@ -3,7 +3,23 @@ import re
import util
import colors
import time
-
+import requests
+import hashlib
+
+def get_best_source(exclude=[], sources_list="/var/lib/xipkg/sources"):
+ # TODO implement exclude
+ source_speeds = {}
+ with open(sources_list, "r") as file:
+ for line in file.readlines():
+ split = line.split(" ")
+ if len(split) > 0:
+ try:
+ source_speeds[split[0]] = float(split[1])
+ except:
+ pass
+
+ return sorted(source_speeds.keys(), key=lambda k: source_speeds[k])
+
def find_package(query, repos, packages_dir, sources):
for repo in repos:
@@ -24,14 +40,44 @@ def find_package(query, repos, packages_dir, sources):
return None, [], None
-def retrieve_package_info(sources, checksum, package_name,
+def verify_signature(package_file, package_info,
+ cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain",
+ verbose=False):
+
+ checksum = package_info["CHECKSUM"]
+
+ sig_cached_path = util.add_path(cache_dir, checksum + ".sig")
+ with open(sig_cached_path, "wb") as file:
+ file.write(package_info["SIGNATURE"])
+
+ keys = os.listdir(keychain_dir)
+ for key in keys:
+ key_path = util.add_path(keychain_dir, key)
+
+ command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"
+
+ if "OK" in os.popen(command).read():
+ return True
+ elif verbose:
+ print(colors.RED
+ + f"Failed to verify signature against {key}"
+ + colors.RESET)
+
+ return False
+
+def retrieve_package_info(sources, checksum, package_name, config,
verbose=False, skip_verification=False):
+
+ sources_list=config["dir"]["sources"]
+ cache_dir=config["dir"]["cache"]
# TODO we may potentially do this a few times while resolving deps, might want to cache things here
# TODO actually use the ping times we made earlier to decide which source to pick
- for source,url in sources.items():
+ for source in get_best_source(sources_list=sources_list):
+ url = sources[source]
+
package_info_url = util.add_path(url, package_name + ".xipkg.info")
- status, response = util.curl(package_info_url)
+ status, response = util.curl(package_info_url, raw=True)
if status == 200:
info = parse_package_info(response)
@@ -46,40 +92,69 @@ def retrieve_package_info(sources, checksum, package_name,
print(colors.RED + f"No matching hashes found" + colors.RESET)
return {}
-def retrieve_package(sources, checksum, package_name,
+def retrieve_package(sources, package_info, package_name, config,
verbose=False, skip_verification=False):
+ sources_list=config["dir"]["sources"]
+ cache_dir=config["dir"]["cache"]
+ keychain_dir=config["dir"]["keychain"]
+
# TODO actually use the ping times we made earlier to decide which source to pick
# TODO actually save tar file, and add loading bar
- for source,url in sources.items():
- package_info_url = util.add_path(url, package_name + ".xipkg")
- status, response = util.curl(package_info_url)
-
+
+ checksum = package_info["CHECKSUM"]
+
+ for source in get_best_source(sources_list=sources_list):
+ url = sources[source]
+ package_url = util.add_path(url, package_name + ".xipkg")
+ package_dir = util.add_path(cache_dir, source)
+
+ util.mkdir(package_dir)
+ status, package_path = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), text=package_name + ".xipkg")
+
if status == 200:
- downloaded_checksum = util.md5sum(response)
- print(downloaded_checksum, "compared to requested", checksum)
- if downloaded_checksum == checksum or skip_verification:
- return reponse
+ downloaded_checksum = util.md5sum(package_path)
+
+ if not skip_verification:
+ if downloaded_checksum == checksum:
+
+ if verify_signature(package_path, package_info,
+ cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose):
+ return package_path
+ elif verbose:
+ print(colors.RED
+ + f"Failed to verify signature for {package_name} in {source}"
+ + colors.RESET)
+ elif verbose:
+ print(colors.RED
+ + f"Checksum verification failed for {package_name} in {source}"
+ + colors.RESET)
else:
- if verbose:
- print(colors.RED
- + f"Checksum verification failed for {package_name} in {source}"
- + colors.RESET)
+ return package_path
if verbose:
- print(colors.RED + f"No matching hashes found" + colors.RESET)
- return {}
+ print(colors.RED + f"No valid packages found" + colors.RESET)
+ return ""
def parse_package_info(packageinfo):
info = {}
+ lines = packageinfo.split(b"\n")
- for line in packageinfo.split("\n"):
- split = line.split("=")
+ index = 0
+ while index < len(lines):
+ line = lines[index]
+ split = line.split(b"=")
if len(split) > 1:
- info[split[0]] = "=".join(split[1:])
-
+ if split[0] == b"SIGNATURE":
+ index += 1
+ digest = b"\n".join(lines[index:])
+ info["SIGNATURE"] = digest
+ break;
+ else:
+ info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8")
+ index += 1
return info
-def resolve_dependencies(package_info, config):
+def resolve_dependencies(package_info):
getpkgs = lambda deps: re.findall("\w*", deps)
deps = getpkgs(package_info["DEPS"])
@@ -101,20 +176,20 @@ def find_all_dependencies(package_names, options, config):
dep_checksum, dep_sources, dep_repo = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])
if dep_checksum is not None:
info = retrieve_package_info(
- dep_sources, dep_checksum, dep,
+ dep_sources, dep_checksum, dep, config,
verbose=options["v"], skip_verification=options["u"]
)
if len(info) > 0:
- all_deps.append(dep)
- deps = resolve_dependencies(info, config)
- for dep in deps:
- if not dep in all_deps:
-
- if is_installed(dep, config):
- print(colors.YELLOW + f"Package {query} has already been installed")
- else:
- to_check.append(dep)
+ if not dep in all_deps:
+ all_deps.append(dep)
+ deps = resolve_dependencies(info)
+ for dep in deps:
+ if not dep in all_deps:
+ if is_installed(dep, config):
+ print(colors.YELLOW + f"Package {query} has already been installed")
+ else:
+ to_check.append(dep)
else:
if options["v"]:
util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {query}")
@@ -154,7 +229,18 @@ def install(args, options, config):
print()
if util.ask_confirmation(colors.BLUE + "Continue?", no_confirm=options["y"]):
- print("installed")
+
+ for package in to_install:
+ checksum, sources, repo = find_package(package, config["repos"],
+ config["dir"]["packages"], config["sources"])
+
+ info = retrieve_package_info(
+ sources, checksum, package, config,
+ verbose=v, skip_verification=unsafe
+ )
+
+ retrieve_package(sources, info, package, config,
+ verbose=v, skip_verification=unsafe)
else:
print(colors.RED + "Action cancelled by user")
else:
diff --git a/src/xi.py b/src/xi.py
index d00c2df..f6089ec 100644
--- a/src/xi.py
+++ b/src/xi.py
@@ -20,7 +20,7 @@ verbs = { v: globals()[v] for v in [
def main():
opts = options.parse_args()
args = opts["args"]
-
+
if opts["h"]:
options.print_usage()
return