summaryrefslogtreecommitdiff
path: root/src/verbs
diff options
context:
space:
mode:
authordavidovski <git@davidovski.xyz>2021-11-22 12:27:24 +0000
committerdavidovski <git@davidovski.xyz>2021-11-22 12:27:24 +0000
commit87d9f2978513e76aae86b25bb8660e49bae20061 (patch)
tree9f03986f9caf67118b22d803494e185ca9d2efc0 /src/verbs
parenta99d8c6a068e410b69bc506313895920ee2fcb2a (diff)
added automatic key importing
fixed best source to actually use available sources instead of just any
Diffstat (limited to 'src/verbs')
-rw-r--r--src/verbs/install.py35
-rw-r--r--src/verbs/sync.py36
2 files changed, 42 insertions, 29 deletions
diff --git a/src/verbs/install.py b/src/verbs/install.py
index 6871f2e..0984793 100644
--- a/src/verbs/install.py
+++ b/src/verbs/install.py
@@ -6,15 +6,15 @@ import time
import requests
import hashlib
-def get_best_source(exclude=[], sources_list="/var/lib/xipkg/sources"):
- # TODO implement exclude
+def get_best_source(available, sources_list="/var/lib/xipkg/sources"):
source_speeds = {}
with open(sources_list, "r") as file:
for line in file.readlines():
split = line.split(" ")
if len(split) > 0:
try:
- source_speeds[split[0]] = float(split[1])
+ if split[0] in available:
+ source_speeds[split[0]] = float(split[1])
except:
pass
@@ -50,19 +50,22 @@ def verify_signature(package_file, package_info,
with open(sig_cached_path, "wb") as file:
file.write(package_info["SIGNATURE"])
- keys = os.listdir(keychain_dir)
- for key in keys:
- key_path = util.add_path(keychain_dir, key)
-
- command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"
+ if os.path.exists(keychain_dir):
+ keys = os.listdir(keychain_dir)
+ for key in keys:
+ key_path = util.add_path(keychain_dir, key)
+
+ command = f"openssl dgst -verify {key_path} -signature {sig_cached_path} {package_file}"
- if "OK" in os.popen(command).read():
- return True
- elif verbose:
- print(colors.RED
- + f"Failed to verify signature against {key}"
- + colors.RESET)
+ if "OK" in os.popen(command).read():
+ return True
+ elif verbose:
+ print(colors.RED
+ + f"Failed to verify signature against {key}"
+ + colors.RESET)
+ elif verbose:
+ print(colors.BLACK + "There are no keys to verify with")
return False
def retrieve_package_info(sources, checksum, package_name, config,
@@ -72,7 +75,7 @@ def retrieve_package_info(sources, checksum, package_name, config,
cache_dir=config["dir"]["cache"]
# TODO we may potentially do this a few times while resolving deps, might want to cache things here
- for source in get_best_source(sources_list=sources_list):
+ for source in get_best_source(sources, sources_list=sources_list):
url = sources[source]
package_info_url = util.add_path(url, package_name + ".xipkg.info")
@@ -100,7 +103,7 @@ def retrieve_package(sources, package_info, package_name, config,
checksum = package_info["CHECKSUM"]
- for source in get_best_source(sources_list=sources_list):
+ for source in get_best_source(sources, sources_list=sources_list):
url = sources[source]
if verbose:
print(colors.LIGHT_BLACK + f"using source {source} at {url}")
diff --git a/src/verbs/sync.py b/src/verbs/sync.py
index 3821f82..8dde22d 100644
--- a/src/verbs/sync.py
+++ b/src/verbs/sync.py
@@ -71,18 +71,21 @@ def save_package(package, info, location):
# security problem to automatically decide to verify keys
# users should do this manually whenever they add a new source
###### !!! #######
-def import_key(source, url, verbose=False):
+def import_key(source, url, config, verbose=False):
keyname = "xi.pub"
- status, response = curl(url + keyname if url[-1] == "/" else f"/{keyname}")
- if status == 200:
- key_path = os.path.join(config["dir"]["keychain"], source + ".pub")
- with open(key_path, "w"):
- key_path.write(key_path)
+ keychain_dir = config["dir"]["keychain"]
+ util.mkdir(keychain_dir)
+ key_path = os.path.join(keychain_dir, source + ".pub")
- elif verbose:
- print(colors.BG_RED + f"" + colors.RESET)
+ if os.path.exists(key_path):
+ if verbose:
+ print(colors.LIGHT_BLACK + f"Skipping already imported key from {source}")
+ return 0
+ else:
+ key_path = util.curl_to_file(url + keyname if url[-1] == "/" else f"/{keyname}", key_path)
+ return 1
def test_source(source, url):
# requesting a resource may not be the best way to do this, caching etc
@@ -123,11 +126,9 @@ def sync(args, options, config):
v = options["v"]
- # test_sources(sources, config["dir"]["sources"], test_count=int(config["pings"]))
-
for repo in repos:
- if v:
- print(colors.LIGHT_BLACK + f"downloading package lists for {repo}...")
+ if v: print(colors.LIGHT_BLACK + f"downloading package lists for {repo}...")
+
packages, speeds = sync_packages(repo, sources, verbose=v)
if v: print(colors.LIGHT_BLACK + f"downloaded {len(packages)} packages from {len(sources)} sources")
@@ -148,7 +149,16 @@ def sync(args, options, config):
util.loading_bar(total, total, f"Synced {repo}")
print(colors.RESET)
-
+ if "key_authority" in config:
+ imported = 0
+ authorities = config["key_authority"]
+ for authority in authorities:
+ if authority in sources:
+ url = sources[authority]
+ imported += import_key(authority, url, config, verbose=v)
+ elif v:
+ print(colors.RED + f"Cannot find authority {authority} in sources")
+ if imported > 0: print(colors.CYAN + f"Imported keys from {imported} sources")
#total = len(sources)
#completed = 0
#for source, url in sources: