diff options
| -rw-r--r-- | src/options.py | 31 | ||||
| -rw-r--r-- | src/util.py | 53 | ||||
| -rw-r--r-- | src/verbs/install.py | 156 | ||||
| -rw-r--r-- | src/xi.py | 2 | ||||
| -rw-r--r-- | xipkg.conf | 1 | 
5 files changed, 188 insertions, 55 deletions
| diff --git a/src/options.py b/src/options.py index 499f9e7..011c039 100644 --- a/src/options.py +++ b/src/options.py @@ -61,27 +61,32 @@ def parse_args():          arg = args[index]          if len(arg) > 1 and arg[0] == "-": -            option = None +            option = []              # is a named argument with a --              if arg[1] == "-" and len(arg) > 2 and arg[2:].split("=")[0] in names: -                option = names[arg[2:].split("=")[0]] +                option.appen(names[arg[2:].split("=")[0]])              # is a single letter argument with a - -            elif arg[1] in options: -                option = arg[1]              else: -                parsed["args"].append(arg) +                for letter in arg[1:]: +                    if letter in options: +                        option.append(letter) + +                if len(option) == 0: +                    parsed["args"].append(arg) +              # add the option and any values ot the parsed dict -            if option is not None: -                if options[option]["flag"]: -                    parsed[option] = True -                else: -                    if "=" in arg: -                        parsed[option] = arg.split("=")[1] +            for opt in option: +                if opt is not None: +                    if options[opt]["flag"]: +                        parsed[opt] = True                      else: -                        index += 1 -                        parsed[option] = args[index] +                        if "=" in arg: +                            parsed[opt] = arg.split("=")[1] +                        else: +                            index += 1 +                            parsed[opt] = args[index]          else:              parsed["args"].append(arg) diff --git a/src/util.py b/src/util.py index 99bb82e..cef5529 100644 --- a/src/util.py +++ b/src/util.py @@ -8,8 +8,11 @@ import hashlib  DEFAULT_BAR_COLOR = colors.BLACK + colors.BG_CYAN  DEFAULT_BAR_COLOR_RESET = colors.BG_BLACK + colors.CYAN -def add_path(a, b): -    return a + (b if a[-1] == "/" else f"/{b}") +def add_path(*argv): +    a = argv[0] +    for b in argv[1:]: +        a = a + (b if a[-1] == "/" else f"/{b}") +    return a  def loading_bar(completed, total, text,           unit="", color=DEFAULT_BAR_COLOR, reset=DEFAULT_BAR_COLOR_RESET): @@ -29,19 +32,57 @@ def loading_bar(completed, total, text,  def print_reset(text):      print(colors.RESET + text) -def curl(url): +def curl(url, raw=False):      try:          r = requests.get(url)      except:          return 500, "" -    return r.status_code, r.text +    return r.status_code, r.content if raw else r.text + +def get_unit(n): +    base = 1000 +    if n > base**4: return base**4, "TB" +    elif n > base**3: return base**3, "GB" +    elif n > base**2: return base**2, "MB" +    elif n > base**1: return base**1, "KB" + +def curl_to_file(url, path, text=""): +    with requests.get(url, stream=True) as r: +        r.raise_for_status() +        length = int(r.headers['content-length']) +        with open(path, "wb") as f: + +            c_size = 4096 +            ic = r.iter_content(chunk_size=c_size) +            done = 0 + +            for chunk in ic: +                if text: +                    divisor, unit = get_unit(length) +                    loading_bar(int(done/divisor), int(length/divisor), "Downloading " + text, unit=unit) + +                f.write(chunk) +                done += c_size +        if text: +            divisor, unit = get_unit(length) +            loading_bar(int(done/divisor), int(length/divisor), "Downloaded " + text, unit=unit) +        print(colors.RESET) + +        return r.status_code, path +  def mkdir(path):      if not os.path.exists(path):          os.makedirs(path) -def md5sum(data): -    return hashlib.md5(data) +def md5sum(filename): +    md5_hash = hashlib.md5() + +    with open(filename,"rb") as f: +        for byte_block in iter(lambda: f.read(4096),b""): +            md5_hash.update(byte_block) + +        return md5_hash.hexdigest()  def ask_confirmation(text, default=True, no_confirm=False):      yes = "Y" if default else "y" diff --git a/src/verbs/install.py b/src/verbs/install.py index 2026ab4..b5555a7 100644 --- a/src/verbs/install.py +++ b/src/verbs/install.py @@ -3,7 +3,23 @@ import re  import util  import colors  import time - +import requests +import hashlib + +def get_best_source(exclude=[], sources_list="/var/lib/xipkg/sources"): +    # TODO implement exclude +    source_speeds = {} +    with open(sources_list, "r") as file: +        for line in file.readlines(): +            split = line.split(" ") +            if len(split) > 0: +                try: +                    source_speeds[split[0]] = float(split[1]) +                except: +                    pass + +    return sorted(source_speeds.keys(), key=lambda k: source_speeds[k]) +          def find_package(query, repos, packages_dir, sources):      for repo in repos: @@ -24,14 +40,44 @@ def find_package(query, repos, packages_dir, sources):      return None, [], None -def retrieve_package_info(sources, checksum, package_name,  +def verify_signature(package_file, package_info,  +                            cache_dir="/var/cache/xipkg", keychain_dir="/var/lib/xipkg/keychain", +                            verbose=False): + +    checksum = package_info["CHECKSUM"] +     +    sig_cached_path = util.add_path(cache_dir, checksum + ".sig") +    with open(sig_cached_path, "wb") as file: +        file.write(package_info["SIGNATURE"]) + +    keys = os.listdir(keychain_dir) +    for key in keys: +        key_path = util.add_path(keychain_dir, key) +         +        command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"  + +        if "OK" in os.popen(command).read(): +            return True +        elif verbose: +            print(colors.RED  +                    + f"Failed to verify signature against {key}" +                    + colors.RESET) + +    return False + +def retrieve_package_info(sources, checksum, package_name, config,                              verbose=False, skip_verification=False): + +    sources_list=config["dir"]["sources"] +    cache_dir=config["dir"]["cache"]      # TODO we may potentially do this a few times while resolving deps, might want to cache things here      # TODO actually use the ping times we made earlier to decide which source to pick -    for source,url in sources.items(): +    for source in get_best_source(sources_list=sources_list): +        url = sources[source] +          package_info_url = util.add_path(url, package_name + ".xipkg.info") -        status, response = util.curl(package_info_url) +        status, response = util.curl(package_info_url, raw=True)          if status == 200:              info = parse_package_info(response) @@ -46,40 +92,69 @@ def retrieve_package_info(sources, checksum, package_name,          print(colors.RED + f"No matching hashes found" + colors.RESET)      return {} -def retrieve_package(sources, checksum, package_name,  +def retrieve_package(sources, package_info, package_name, config,                              verbose=False, skip_verification=False): +    sources_list=config["dir"]["sources"] +    cache_dir=config["dir"]["cache"] +    keychain_dir=config["dir"]["keychain"] +      # TODO actually use the ping times we made earlier to decide which source to pick      # TODO actually save tar file, and add loading bar -    for source,url in sources.items(): -        package_info_url = util.add_path(url, package_name + ".xipkg") -        status, response = util.curl(package_info_url) -    + +    checksum = package_info["CHECKSUM"] + +    for source in get_best_source(sources_list=sources_list): +        url = sources[source] +        package_url = util.add_path(url, package_name + ".xipkg") +        package_dir = util.add_path(cache_dir, source) + +        util.mkdir(package_dir) +        status, package_path = util.curl_to_file(package_url, util.add_path(package_dir, package_name + ".xipkg"), text=package_name + ".xipkg") +          if status == 200: -            downloaded_checksum = util.md5sum(response) -            print(downloaded_checksum, "compared to requested", checksum) -            if downloaded_checksum == checksum or skip_verification: -                return reponse +            downloaded_checksum = util.md5sum(package_path) +             +            if not skip_verification: +                if downloaded_checksum == checksum: + +                    if verify_signature(package_path, package_info,  +                            cache_dir=cache_dir, keychain_dir=keychain_dir, verbose=verbose): +                        return package_path +                    elif verbose: +                        print(colors.RED  +                                + f"Failed to verify signature for {package_name} in {source}"  +                                + colors.RESET) +                elif verbose: +                        print(colors.RED  +                                + f"Checksum verification failed for {package_name} in {source}"  +                                + colors.RESET)              else: -                if verbose: -                    print(colors.RED  -                            + f"Checksum verification failed for {package_name} in {source}"  -                            + colors.RESET) +                return package_path      if verbose: -        print(colors.RED + f"No matching hashes found" + colors.RESET) -    return {} +        print(colors.RED + f"No valid packages found" + colors.RESET) +    return ""  def parse_package_info(packageinfo):      info = {} +    lines = packageinfo.split(b"\n") -    for line in packageinfo.split("\n"): -        split = line.split("=") +    index = 0 +    while index < len(lines): +        line = lines[index] +        split = line.split(b"=")          if len(split) > 1: -            info[split[0]] = "=".join(split[1:]) - +            if split[0] == b"SIGNATURE": +                index += 1 +                digest = b"\n".join(lines[index:]) +                info["SIGNATURE"] = digest +                break; +            else: +                info[str(split[0], "utf-8")] = str(b"=".join(split[1:]), "utf-8") +        index += 1      return info -def resolve_dependencies(package_info, config): +def resolve_dependencies(package_info):      getpkgs = lambda deps: re.findall("\w*", deps)      deps = getpkgs(package_info["DEPS"]) @@ -101,20 +176,20 @@ def find_all_dependencies(package_names, options, config):          dep_checksum, dep_sources, dep_repo = find_package(dep, config["repos"], config["dir"]["packages"], config["sources"])          if dep_checksum is not None:              info = retrieve_package_info( -                        dep_sources, dep_checksum, dep, +                        dep_sources, dep_checksum, dep, config,                          verbose=options["v"], skip_verification=options["u"]                      )              if len(info) > 0: -                all_deps.append(dep) -                deps = resolve_dependencies(info, config) -                for dep in deps: -                    if not dep in all_deps: - -                        if is_installed(dep, config): -                            print(colors.YELLOW + f"Package {query} has already been installed") -                        else: -                            to_check.append(dep) +                if not dep in all_deps: +                    all_deps.append(dep) +                    deps = resolve_dependencies(info) +                    for dep in deps: +                        if not dep in all_deps: +                            if is_installed(dep, config): +                                print(colors.YELLOW + f"Package {query} has already been installed") +                            else: +                                to_check.append(dep)              else:                  if options["v"]:                      util.print_reset(colors.CLEAR_LINE + colors.RED + f"Failed to retrieve info for {query}") @@ -154,7 +229,18 @@ def install(args, options, config):          print()          if util.ask_confirmation(colors.BLUE + "Continue?", no_confirm=options["y"]): -            print("installed") + +            for package in to_install: +                checksum, sources, repo = find_package(package, config["repos"], +                        config["dir"]["packages"], config["sources"]) + +                info = retrieve_package_info( +                            sources, checksum, package, config, +                            verbose=v, skip_verification=unsafe +                        ) + +                retrieve_package(sources, info, package, config, +                        verbose=v, skip_verification=unsafe)          else:              print(colors.RED + "Action cancelled by user")      else: @@ -20,7 +20,7 @@ verbs = { v: globals()[v] for v in [  def main():      opts = options.parse_args()      args = opts["args"] - +          if opts["h"]:          options.print_usage()          return @@ -3,6 +3,7 @@  include /etc/xipkg.d/default.conf  sources { +    #localhost       http://localhost:8089/repo/      davidovski      https://xi.davidovski.xyz/repo/      codeberg        https://xi.codeberg.page/repo/      ftp             https://xilinux.ftp.sh/repo/ | 
