summaryrefslogtreecommitdiff
path: root/Reflector.py
diff options
context:
space:
mode:
Diffstat (limited to 'Reflector.py')
-rw-r--r--Reflector.py964
1 files changed, 547 insertions, 417 deletions
diff --git a/Reflector.py b/Reflector.py
index b81f910..bcbb82f 100644
--- a/Reflector.py
+++ b/Reflector.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-# Copyright (C) 2012, 2013 Xyne
+# Copyright (C) 2012-2019 Xyne
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -22,6 +22,7 @@ import datetime
import errno
import getpass
import http.client
+import itertools
import json
import logging
import os
@@ -37,10 +38,287 @@ import time
import urllib.error
import urllib.request
+################################## Constants ###################################
+NAME = 'Reflector'
+
+URL = 'https://www.archlinux.org/mirrors/status/json/'
+
+DISPLAY_TIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC'
+PARSE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
+PARSE_TIME_FORMAT_WITH_USEC = '%Y-%m-%dT%H:%M:%S.%fZ'
+
+DB_SUBPATH = 'core/os/x86_64/core.db'
+
+MIRROR_URL_FORMAT = '{0}{1}/os/{2}'
+MIRRORLIST_ENTRY_FORMAT = "Server = " + MIRROR_URL_FORMAT + "\n"
+
+DEFAULT_CONNECTION_TIMEOUT = 5
+DEFAULT_CACHE_TIMEOUT = 300
+DEFAULT_N_THREADS = 5
+
+SORT_TYPES = {
+ 'age' : 'last server synchronization',
+ 'rate' : 'download rate',
+ 'country': 'server\'s location',
+ 'score' : 'MirrorStatus score',
+ 'delay' : 'MirrorStatus delay',
+}
+
+
+################################# IO Functions #################################
+
+def get_cache_file():
+ '''
+ Get a nearly XDG-compliant cache directory. PyXDG is not used to avoid the
+ external dependency. It is not fully compliant because it omits the
+ application name, but the mirror status file can be reused by other
+ applications and this stores no other files.
+ '''
+ base_name = 'mirrorstatus.json'
+ cache_dir = os.getenv('XDG_CACHE_HOME', default=os.path.expanduser('~/.cache'))
+ try:
+ os.makedirs(cache_dir, exist_ok=True)
+ # Raised by makedirs if permissions do not match umask
+ except FileExistsError:
+ pass
+ return os.path.join(cache_dir, base_name)
+
+
+
+def get_mirrorstatus(
+ connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
+ cache_timeout=DEFAULT_CACHE_TIMEOUT
+):
+ '''
+ Retrieve the mirror status JSON object. The downloaded data will be cached
+ locally and re-used within the cache timeout period. Returns the object and
+ the local cache's modification time.
+ '''
+ cache_path = get_cache_file()
+ try:
+ mtime = os.path.getmtime(cache_path)
+ invalid = (time.time() - mtime) > cache_timeout
+ except FileNotFoundError:
+ mtime = None
+ invalid = True
+
+ try:
+ if invalid:
+ with urllib.request.urlopen(URL, None, connection_timeout) as h:
+ obj = json.loads(h.read().decode())
+ with open(cache_path, 'w') as h:
+ json.dump(obj, h, sort_keys=True, indent=2)
+ mtime = time.time()
+ else:
+ with open(cache_path, 'r') as h:
+ obj = json.load(h)
+
+ return obj, mtime
+ except (IOError, urllib.error.URLError, socket.timeout) as e:
+ raise MirrorStatusError(str(e))
+
+
+
+################################ Miscellaneous #################################
+
+def get_logger():
+ '''
+ Get the logger used by this module. Use this to be sure that the right logger
+ is used.
+ '''
+ return logging.getLogger(NAME)
+
+
+
+def format_last_sync(mirrors):
+ '''
+ Parse and format the "last_sync" field.
+ '''
+ for m in mirrors:
+ last_sync = calendar.timegm(time.strptime(m['last_sync'], PARSE_TIME_FORMAT))
+ m.update(last_sync=last_sync)
+ yield m
+
+
+
+def count_countries(mirrors):
+ '''
+ Count the mirrors in each country.
+ '''
+ countries = dict()
+ for m in mirrors:
+ k = (m['country'], m['country_code'])
+ if not any(k):
+ continue
+ try:
+ countries[k] += 1
+ except KeyError:
+ countries[k] = 1
+ return countries
+
+
+
+################################### Sorting ####################################
+
+def sort(mirrors, by=None, n_threads=DEFAULT_N_THREADS):
+ '''
+ Sort mirrors by different criteria.
+ '''
+ # Ensure that "mirrors" is a list that can be sorted.
+ if not isinstance(mirrors, list):
+ mirrors = list(mirrors)
+
+ if by == 'age':
+ mirrors.sort(key=lambda m: m['last_sync'], reverse=True)
+
+ elif by == 'rate':
+ rates = rate(mirrors, n_threads=n_threads)
+ mirrors = sorted(mirrors, key=lambda m: rates[m['url']], reverse=True)
+
+ else:
+ try:
+ mirrors.sort(key=lambda m: m[by])
+ except KeyError:
+ raise MirrorStatusError('attempted to sort mirrors by unrecognized criterion: "{}"'.format(by))
+
+ return mirrors
+
+
+#################################### Rating ####################################
+
+def rate_rsync(db_url, connection_timeout=DEFAULT_CONNECTION_TIMEOUT):
+ '''
+ Download a database via rsync and return the time and rate of the download.
+ '''
+ rsync_cmd = [
+ 'rsync',
+ '-avL', '--no-h', '--no-motd',
+ '--contimeout={}'.format(connection_timeout),
+ db_url
+ ]
+ try:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ t0 = time.time()
+ subprocess.check_call(
+ rsync_cmd + [tmpdir],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL
+ )
+ dt = time.time() - t0
+ size = os.path.getsize(
+ os.path.join(tmpdir, os.path.basename(DB_SUBPATH))
+ )
+ r = size / dt
+ return dt, r
+ except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
+ return None, 0
+
+
+
+def rate_http(db_url, connection_timeout=DEFAULT_CONNECTION_TIMEOUT):
+ '''
+ Download a database via any protocol supported by urlopen and return the time
+ and rate of the download.
+ '''
+ req = urllib.request.Request(url=db_url)
+ try:
+ with urllib.request.urlopen(req, None, connection_timeout) as f:
+ t0 = time.time()
+ size = len(f.read())
+ dt = time.time() - t0
+ r = size / (dt)
+ return dt, r
+ except (OSError, urllib.error.HTTPError, http.client.HTTPException):
+ return None, 0
+
+
+
+def rate(mirrors, n_threads=DEFAULT_N_THREADS, connection_timeout=DEFAULT_CONNECTION_TIMEOUT):
+ '''
+ Rate mirrors by timing the download the core repo's database for each one.
+ '''
+ # Ensure that mirrors is not a generator so that its length can be determined.
+ if not isinstance(mirrors, tuple):
+ mirrors = tuple(mirrors)
+
+ if not mirrors:
+ return None
+
+ # At least 1 thread and not more than the number of mirrors.
+ n_threads = max(1, min(n_threads, len(mirrors)))
+
+ # URL input queue.
+ q_in = queue.Queue()
+ # URL, elapsed time and rate output queue.
+ q_out = queue.Queue()
+
+
+ def worker():
+ while True:
+ # To stop a thread, an integer will be inserted in the input queue. Each
+ # thread will increment it and re-insert it until it equals the
+ # threadcount. After encountering the integer, the thread exits the loop.
+ url = q_in.get()
+
+ if isinstance(url, int):
+ if url < n_threads:
+ q_in.put(url + 1)
+
+ else:
+ db_url = url + DB_SUBPATH
+ scheme = urllib.parse.urlparse(url).scheme
+
+ if scheme == 'rsync':
+ dt, r = rate_rsync(db_url, connection_timeout)
+ else:
+ dt, r = rate_http(db_url, connection_timeout)
+
+ q_out.put((url, dt, r))
+
+ q_in.task_done()
+
+
+ workers = tuple(threading.Thread(target=worker) for _ in range(n_threads))
+ for w in workers:
+ w.daemon = True
+ w.start()
+
+ url_len = max(len(m['url']) for m in mirrors)
+ logger = get_logger()
+ for m in mirrors:
+ url = m['url']
+ logger.info("rating {}".format(url))
+ q_in.put(url)
+
+ # To exit the threads.
+ q_in.put(0)
+ q_in.join()
+
+ header_fmt = '{{:{:d}s}} {{:>14s}} {{:>9s}}'.format(url_len)
+ logger.info(header_fmt.format('Server', 'Rate', 'Time'))
+ fmt = '{{:{:d}s}} {{:8.2f}} KiB/s {{:7.2f}} s'.format(url_len)
+
+ # Loop over the mirrors just to ensure that we get the rate for each mirror.
+ # The value in the loop does not (necessarily) correspond to the mirror.
+ rates = dict()
+ for _ in mirrors:
+ url, dt, r = q_out.get()
+ kibps = r / 1024.0
+ logger.info(fmt.format(url, kibps, dt))
+ rates[url] = r
+ q_out.task_done()
+
+ return rates
+
+
+
+############################## MirrorStatusError ###############################
-# Generic MirrorStatus Exception
class MirrorStatusError(Exception):
+ '''
+ Common base exception raised by this module.
+ '''
def __init__(self, msg):
self.msg = msg
def __str__(self):
@@ -48,44 +326,168 @@ class MirrorStatusError(Exception):
-def get_cache_file():
- bname = 'mirrorstatus.json'
- path = os.getenv('XDG_CACHE_HOME')
- if path:
- try:
- os.makedirs(path, exist_ok=True)
- # Raised if permissions do not match umask
- except FileExistsError:
- pass
- return os.path.join(path, bname)
+
+############################## MirrorStatusFilter ##############################
+
+class MirrorStatusFilter():
+
+ def __init__(
+ self,
+ min_completion_pct=1.0,
+ countries=None,
+ protocols=None,
+ include=None,
+ exclude=None,
+ age=None,
+ isos=False,
+ ipv4=False,
+ ipv6=False
+ ):
+ self.min_completion_pct = min_completion_pct
+ self.countries = tuple(c.upper() for c in countries) if countries else None
+ self.protocols = protocols
+ self.include = tuple(re.compile(r) for r in include) if include else None
+ self.exclude = tuple(re.compile(r) for r in exclude) if exclude else None
+ self.age = age
+ self.isos = isos
+ self.ipv4 = ipv4
+ self.ipv6 = ipv6
+
+
+ def filter_mirrors(self, mirrors):
+ # Filter unsynced mirrors.
+ mirrors = (m for m in mirrors if m['last_sync'])
+
+ # Parse the last sync time.
+ mirrors = format_last_sync(mirrors)
+
+ # Filter by completion "percent" [0-1].
+ mirrors = (m for m in mirrors if m['completion_pct'] >= self.min_completion_pct)
+
+ # Filter by countries.
+ if self.countries:
+ mirrors = (
+ m for m in mirrors
+ if m['country'].upper() in self.countries
+ or m['country_code'].upper() in self.countries
+ )
+
+ # Filter by protocols.
+ if self.protocols:
+ mirrors = (m for m in mirrors if m['protocol'] in self.protocols)
+
+ # Filter by include expressions.
+ if self.include:
+ mirrors = (m for m in mirrors if any(r.search(m['url']) for r in self.include))
+
+ # Filter by exclude expressions.
+ if self.exclude:
+ mirrors = (m for m in mirrors if not any(r.search(m['url']) for r in self.exclude))
+
+ # Filter by age. The age is given in hours and converted to seconds. Servers
+ # with a last refresh older than the age are omitted.
+ if self.age and self.age > 0:
+ t = time.time()
+ a = self.age * 60**2
+ mirrors = (m for m in mirrors if (m['last_sync'] + a) >= t)
+
+ # The following does not work. Only the final iteration affects "mirrors".
+ # TODO: Understand exactly why the code above works but the loop doesn't.
+ # for field in ('isos', 'ipv4', 'ipv6'):
+ # if getattr(self, field):
+ # mirrors = (m for m in mirrors if m[field])
+
+ # Filter by ISO hosing.
+ if self.isos:
+ mirrors = (m for m in mirrors if m['isos'])
+
+ # Filter by IPv4 support.
+ if self.ipv4:
+ mirrors = (m for m in mirrors if m['ipv4'])
+
+ # Filter by IPv6 support.
+ if self.ipv6:
+ mirrors = (m for m in mirrors if m['ipv6'])
+
+ yield from mirrors
+
+
+
+################################## Formatting ##################################
+
+def format_mirrorlist(mirror_status, mtime, include_country=False, command=None):
+ if command is None:
+ command = '?'
else:
- return '/tmp/.{}.{}'.format(getpass.getuser(), bname)
-
-
-
-class MirrorStatus(object):
- # JSON URI
- URL = 'https://www.archlinux.org/mirrors/status/json/'
- # Mirror URL format. Accepts server base URL, repository, and architecture.
- MIRROR_URL_FORMAT = '{0}{1}/os/{2}'
- MIRRORLIST_ENTRY_FORMAT = "Server = " + MIRROR_URL_FORMAT + "\n"
- DISPLAY_TIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC'
- PARSE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
- # Required for the last_check field, which oddly includes microseconds.
- PARSE_TIME_FORMAT_WITH_USEC = '%Y-%m-%dT%H:%M:%S.%fZ'
- # Recognized list sort types and their descriptions.
- SORT_TYPES = {
- 'age' : 'last server synchronization',
- 'rate' : 'download rate',
- 'country': 'server\'s location',
- 'score' : 'MirrorStatus score',
- 'delay' : 'MirrorStatus delay',
- }
- # Known repositories, i.e. those that should be on each mirror.
- # Used to replace the "$repo" variable.
- # TODO
- # Avoid using a hard-coded list.
- # See https://bugs.archlinux.org/task/32895
+ command = 'reflector ' + ' '.join(pipes.quote(x) for x in command)
+
+ last_check = mirror_status['last_check']
+ # For some reason the "last_check" field included microseconds.
+ try:
+ parsed_last_check = datetime.datetime.strptime(
+ last_check,
+ PARSE_TIME_FORMAT_WITH_USEC,
+ ).timetuple()
+ except ValueError:
+ parsed_last_check = datetime.datetime.strptime(
+ last_check,
+ PARSE_TIME_FORMAT,
+ ).timetuple()
+
+ width = 80
+ colw = 11
+ header = '# Arch Linux mirrorlist generated by Reflector #'.center(width, '#')
+ border = '#' * len(header)
+ mirrorlist = ''
+ mirrorlist = '{}\n{}\n{}\n'.format(border, header, border) + \
+ '\n' + \
+ '\n'.join(
+ '# {{:<{:d}s}} {{}}'.format(colw).format(k, v) for k, v in (
+ ('With:', command),
+ ('When:', time.strftime(DISPLAY_TIME_FORMAT, time.gmtime())),
+ ('From:', URL),
+ ('Retrieved:', time.strftime(DISPLAY_TIME_FORMAT, time.gmtime(mtime))),
+ ('Last Check:', time.strftime(DISPLAY_TIME_FORMAT, parsed_last_check))
+ )
+ ) + \
+ '\n\n'
+
+ country = None
+
+ mirrors = mirror_status['urls']
+ for mirror in mirrors:
+ # Include country tags. This is intended for lists that are sorted by
+ # country.
+ if include_country:
+ c = '{} [{}]'.format(mirror['country'], mirror['country_code'])
+ if c != country:
+ if country:
+ mirrorlist += '\n'
+ mirrorlist += '# {}\n'.format(c)
+ country = c
+ mirrorlist += MIRRORLIST_ENTRY_FORMAT.format(mirror['url'], '$repo', '$arch')
+
+ if mirrors:
+ return mirrorlist
+ else:
+ return None
+
+
+
+
+
+############################ MirrorStatus Retriever ############################
+
+class MirrorStatus():
+ '''
+ This is a legacy class that will likely be removed in the future. It
+ previously held most of this module's functionality until it was refactored
+ into more modular functions. Seemingly pointless code is still used by
+ importers of this module.
+ '''
+
+ # TODO: move these to another module or remove them completely
+ # Related: https://bugs.archlinux.org/task/32895
REPOSITORIES = (
'community',
'community-staging',
@@ -99,325 +501,85 @@ class MirrorStatus(object):
'staging',
'testing'
)
-
- # Known system architectures, as used to replace the "$arch" variable.
+ # Officially supported system architectures.
ARCHITECTURES = ['x86_64']
- # Initialize
- # refresh_interval:
- # The cached list will be replaced after this many seconds have passed.
- # 0 effectively disables caching.
- # Caching is only useful if the object persists, e.g. if it were embedded
- # in a server.
+ MIRROR_URL_FORMAT = MIRROR_URL_FORMAT
+ MIRRORLIST_ENTRY_FORMAT = MIRRORLIST_ENTRY_FORMAT
+
+
def __init__(
self,
- refresh_interval=0,
- verbose=False,
- connection_timeout=5,
-# download_timeout=None,
- cache_timeout=300,
- min_completion_pct=1.,
- threads=5
+ connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
+ cache_timeout=DEFAULT_CACHE_TIMEOUT,
+ min_completion_pct=1.0,
+ threads=DEFAULT_N_THREADS
):
- self.refresh_interval = refresh_interval
-
- # Last modification time of the json object.
- self.json_mtime = 0
- # The parsed JSON object.
- self.json_obj = {}
- # Display extra information.
- self.verbose = verbose
- # Connection timeout
self.connection_timeout = connection_timeout
- # Download timeout
-# self.download_timeout = download_timeout
- # Cache timeout
self.cache_timeout = cache_timeout
- # Minimum completion percent, for filtering mirrors.
self.min_completion_pct = min_completion_pct
- # Threads
self.threads = threads
+ self.mirror_status = None
+ self.ms_mtime = 0
def retrieve(self):
- """Retrieve the current mirror status JSON data."""
- self.json_obj = None
- json_str = None
- save_json = False
-
- cache_file = get_cache_file()
- if self.cache_timeout > 0:
- save_json = True
- try:
- mtime = os.path.getmtime(cache_file)
- if time.time() - mtime < self.cache_timeout:
- try:
- with open(cache_file) as f:
- self.json_obj = json.load(f)
- self.json_mtime = mtime
- save_json = False
- except IOError as e:
- raise MirrorStatusError('failed to load cached JSON data ({})'.format(e))
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise MirrorStatusError('failed to get cache file mtime ({})'.format(e))
-
- if not self.json_obj:
- try:
- with urllib.request.urlopen(MirrorStatus.URL, None, self.connection_timeout) as f:
- json_str = f.read()
- self.json_obj = json.loads(json_str.decode())
- self.json_mtime = time.time()
- except (urllib.error.URLError, socket.timeout) as e:
- raise MirrorStatusError('failed to retrieve mirror data: ({})'.format(e))
- except ValueError as e:
- raise MirrorStatusError('failed to parse retrieved mirror data: ({})'.format(e))
-
- try:
- # Remove servers that have not synced, and parse the "last_sync" times for
- # comparison later.
- mirrors = self.json_obj['urls']
- # Filter incomplete mirrors and mirrors that haven't synced.
- mirrors = list(
- m for m in mirrors
- if m['last_sync']
- and m['completion_pct'] >= self.min_completion_pct
- )
- # Parse 'last_sync' times for future comparison.
- for mirror in mirrors:
- mirror['last_sync'] = calendar.timegm(
- time.strptime(mirror['last_sync'],
- MirrorStatus.PARSE_TIME_FORMAT)
- )
- self.json_obj['urls'] = mirrors
- except KeyError:
- raise MirrorStatusError('failed to parse retrieved mirror data (the format may have changed or there may be a transient error)')
-
- if save_json and json_str:
- try:
- with open(cache_file, 'wb') as f:
- f.write(json_str)
- except IOError as e:
- raise MirrorStatusError('failed to cache JSON data ({})'.format(e))
-
-
-
+ self.mirror_status, self.ms_mtime = get_mirrorstatus(
+ connection_timeout=self.connection_timeout,
+ cache_timeout=self.cache_timeout
+ )
def get_obj(self):
- """Return the JSON object, retrieving new data if necessary."""
- if not self.json_obj \
- or time.time() > (self.json_mtime + self.refresh_interval):
+ '''
+ Get the JSON mirror status.
+ '''
+ t = time.time()
+ if (t - self.ms_mtime) > self.cache_timeout:
self.retrieve()
-
- return self.json_obj
+ return self.mirror_status
def get_mirrors(self):
- """Get the mirrors."""
- return self.get_obj()['urls']
-
+ '''
+ Get the mirror from the mirror status.
+ '''
+ obj = self.get_obj()
+ try:
+ return obj['urls']
+ except KeyError:
+ raise MirrorStatusError('no mirrors detected in mirror status output')
- def filter(
- self,
- mirrors=None,
- countries=None,
- regexes=None, # TODO: remove
- include=None,
- exclude=None,
- age=None,
- protocols=None
- ):
- """Filter using different parameters."""
- # TODO: remove
- if regexes:
-# raise MirrorStatusError('The "regexes" keyword has been deprecated and replaced by "include" and "exclude".')
- if not include:
- include = regexes
- sys.stderr.write('''WARNING: The "regexes" keyword has been deprecated and replaced by "include" and "exclude".
- Support will be soon removed without further warning.''')
- if mirrors is None:
- mirrors = self.get_mirrors()
- t = time.time()
- n = 0
-
- # Make country arguments case-insensitive.
- uc_countries = tuple(c.upper() for c in countries) if countries else None
- for mirror in mirrors:
- # Filter by country.
- if countries \
- and not ( \
- mirror['country'].upper() in uc_countries or \
- mirror['country_code'].upper() in uc_countries \
- ):
- continue
- # Filter by protocol.
- if protocols and not mirror['protocol'] in protocols:
- continue
- # Filter by regex.
- # TODO: Find a better way to do this.
- if include:
- for regex in include:
- if re.search(regex, mirror['url']):
- break
- else:
- continue
- if exclude:
- discard = False
- for regex in exclude:
- if re.search(regex, mirror['url']):
- discard = True
- break
- if discard:
- continue
- # Filter by hours since last sync.
- if age and t > (age * 60**2 + mirror['last_sync']):
- continue
-
- # Yield if we're still here.
- yield mirror
-
-
-
- def sort(self, mirrors=None, by=None):
- """Sort using different parameters."""
+ def filter(self, mirrors=None, **kwargs):
+ '''
+ Filter mirrors by various criteria.
+ '''
if mirrors is None:
mirrors = self.get_mirrors()
- # Ensure that "mirrors" is a list that can be sorted.
- if not isinstance(mirrors, list):
- mirrors = list(mirrors)
+ msf = MirrorStatusFilter(min_completion_pct=self.min_completion_pct, **kwargs)
+ yield from msf.filter_mirrors(mirrors)
- if by == 'age':
- mirrors.sort(key=lambda m: m['last_sync'], reverse=True)
- elif by == 'rate':
- mirrors = self.rate(mirrors)
- elif by in ('country', 'country_code', 'delay', 'score'):
- mirrors.sort(key=lambda m: m[by])
- return mirrors
-
- # Sort mirrors by download speed. Download speed will be calculated from the
- # download time of the [core] database from each server.
- # TODO: Consider ways to improve this.
- # TODO: Consider the effects of threading (do the threads affect the results
- # by competing for bandwidth?)
- def rate(self, mirrors=None, threads=5):
+ def sort(self, mirrors=None, **kwargs):
+ '''
+ Sort mirrors by various criteria.
+ '''
if mirrors is None:
mirrors = self.get_mirrors()
- if not threads:
- threads = self.threads
- # Ensure that "mirrors" is a list and not a generator.
- if not isinstance(mirrors, list):
- mirrors = list(mirrors)
-
- if not mirrors:
- logging.warning('no mirrors selected for rating')
- return mirrors
-
- # Ensure a sane number of threads.
- if threads < 1:
- threads = 1
- else:
- threads = min(threads, len(mirrors))
-
- rates = {}
-
- # URL input queue.Queue
- q_in = queue.Queue()
- # URL and rate output queue.Queue
- q_out = queue.Queue()
- def worker():
- while True:
- url = q_in.get()
- db_subpath = 'core/os/x86_64/core.db'
- db_url = url + db_subpath
- scheme = urllib.parse.urlparse(url).scheme
- # Leave the rate as 0 if the connection fails.
- # TODO: Consider more graceful error handling.
- rate = 0
- dt = float('NaN')
-
- # urllib cannot handle rsync protocol
- if scheme == 'rsync':
- rsync_cmd = [
- 'rsync',
- '-avL', '--no-h', '--no-motd',
- '--contimeout={}'.format(self.connection_timeout),
- db_url
- ]
- try:
- with tempfile.TemporaryDirectory() as tmpdir:
- t0 = time.time()
- subprocess.check_call(
- rsync_cmd + [tmpdir],
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL
- )
- dt = time.time() - t0
- size = os.path.getsize(os.path.join(
- tmpdir,
- os.path.basename(db_subpath)
- ))
- rate = size / dt
- except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
- pass
- else:
- req = urllib.request.Request(url=db_url)
- try:
- t0 = time.time()
- with urllib.request.urlopen(req, None, self.connection_timeout) as f:
- size = len(f.read())
- dt = time.time() - t0
- rate = size / (dt)
- except (OSError, urllib.error.HTTPError, http.client.HTTPException):
- pass
- q_out.put((url, rate, dt))
- q_in.task_done()
-
- # Launch threads
- for i in range(threads):
- t = threading.Thread(target=worker)
- t.daemon = True
- t.start()
-
-
- # Load the input queue.Queue
- url_len = max(len(m['url']) for m in mirrors)
- for mirror in mirrors:
- logging.info("rating {}".format(mirror['url']))
- q_in.put(mirror['url'])
-
- q_in.join()
-
-
- # Get the results
- # The "in mirrors" loop is just used to ensure that the right number of
- # items is retrieved.
+ yield from sort(mirrors, n_threads=self.threads, **kwargs)
- # Display some extra data.
- header_fmt = '{{:{:d}s}} {{:>14s}} {{:>9s}}'.format(url_len)
- logging.info(header_fmt.format('Server', 'Rate', 'Time'))
- fmt = '{{:{:d}s}} {{:8.2f}} KiB/s {{:7.2f}} s'.format(url_len)
- # Loop over the mirrors just to ensure that we get the rate for each mirror.
- # The value in the loop does not (necessarily) correspond to the mirror.
- for _ in mirrors:
- url, rate, dt = q_out.get()
- kibps = rate / 1024.0
- logging.info(fmt.format(url, kibps, dt))
- rates[url] = rate
- q_out.task_done()
-
-
- # Sort by rate.
- rated_mirrors = [m for m in mirrors if rates[m['url']] > 0]
- rated_mirrors.sort(key=lambda m: rates[m['url']], reverse=True)
-
- return rated_mirrors + [m for m in mirrors if rates[m['url']] == 0]
+ def rate(self, mirrors=None, **kwargs):
+ '''
+ Sort mirrors by download speed.
+ '''
+ if mirrors is None:
+ mirrors = self.get_mirrors()
+ yield from sort(mirrors, n_threads=self.threads, by='rate', **kwargs)
@@ -427,85 +589,29 @@ class MirrorStatus(object):
-
- # Return a Pacman-formatted mirrorlist
- # TODO: Reconsider the assumption that self.json_obj has been retrieved.
def get_mirrorlist(self, mirrors=None, include_country=False, cmd=None):
- if mirrors is None:
- mirrors = self.get_mirrors()
- if cmd is None:
- cmd = '?'
- else:
- cmd = 'reflector ' + ' '.join(pipes.quote(x) for x in cmd)
-
- last_check = self.json_obj['last_check']
- # For some reason the "last_check" field included microseconds.
- try:
- parsed_last_check = datetime.datetime.strptime(
- last_check,
- self.PARSE_TIME_FORMAT_WITH_USEC,
- ).timetuple()
- except ValueError:
- parsed_last_check = datetime.datetime.strptime(
- last_check,
- self.PARSE_TIME_FORMAT,
- ).timetuple()
-
- width = 80
- colw = 11
- header = '# Arch Linux mirrorlist generated by Reflector #'.center(width, '#')
- border = '#' * len(header)
- mirrorlist = '{}\n{}\n{}\n'.format(border, header, border) + \
- '\n' + \
- '\n'.join(
- '# {{:<{:d}s}} {{}}'.format(colw).format(k, v) for k, v in (
- ('With:', cmd),
- ('When:', self.display_time(time.gmtime())),
- ('From:', MirrorStatus.URL),
- ('Retrieved:', self.display_time(time.gmtime(self.json_mtime))),
- ('Last Check:', self.display_time(parsed_last_check)),
- )
- ) + \
- '\n\n'
-
- country = None
-
- # mirrors may be a generator so "if mirrors" will not work
- no_mirrors = True
- for mirror in mirrors:
- no_mirrors = False
- # Include country tags. This is intended for lists that are sorted by
- # country.
- if include_country:
- c = '{} [{}]'.format(mirror['country'], mirror['country_code'])
- if c != country:
- if country:
- mirrorlist += '\n'
- mirrorlist += '# {}\n'.format(c)
- country = c
- mirrorlist += MirrorStatus.MIRRORLIST_ENTRY_FORMAT.format(mirror['url'], '$repo', '$arch')
-
- if no_mirrors:
- return None
- else:
- return mirrorlist
+ '''
+ Get a Pacman-formatted mirrorlist.
+ '''
+ obj = self.get_obj().copy()
+ if mirrors is not None:
+ if not isinstance(mirrors, list):
+ mirrors = list(mirrors)
+ obj['urls'] = mirrors
+ return format_mirrorlist(obj, self.ms_mtime, include_country=include_country, command=cmd)
def list_countries(self):
- countries = dict()
- for m in self.get_mirrors():
- k = (m['country'], m['country_code'])
- try:
- countries[k] += 1
- except KeyError:
- countries[k] = 1
- return countries
-
-
+ '''
+ List countries along with a server count for each one.
+ '''
+ mirrors = self.get_mirrors()
+ return count_countries(mirrors)
+############################### argparse Actions ###############################
class ListCountries(argparse.Action):
'''
@@ -523,11 +629,12 @@ class ListCountries(argparse.Action):
-def print_mirror_info(mirrors, time_fmt=MirrorStatus.DISPLAY_TIME_FORMAT):
+def print_mirror_info(mirrors, time_fmt=DISPLAY_TIME_FORMAT):
'''
Print information about each mirror to STDOUT.
'''
if mirrors:
+ # mirrors = format_last_sync(mirrors)
if not isinstance(mirrors, list):
mirrors = list(mirrors)
ks = sorted(k for k in mirrors[0].keys() if k != 'url')
@@ -551,8 +658,8 @@ def add_arguments(parser):
parser = argparse.ArgumentParser(description='retrieve and filter a list of the latest Arch Linux mirrors')
parser.add_argument(
- '--connection-timeout', type=int, metavar='n', default=5,
- help='The number of seconds to wait before a connection times out.'
+ '--connection-timeout', type=int, metavar='n', default=DEFAULT_CONNECTION_TIMEOUT,
+ help='The number of seconds to wait before a connection times out. Default: %(default)s'
)
# parser.add_argument(
@@ -566,8 +673,8 @@ def add_arguments(parser):
)
parser.add_argument(
- '--cache-timeout', type=int, metavar='n', default=300,
- help='The cache timeout in seconds for the data retrieved from the Arch Linux Mirror Status API. The default is 300 (5 minutes).'
+ '--cache-timeout', type=int, metavar='n', default=DEFAULT_CACHE_TIMEOUT,
+ help='The cache timeout in seconds for the data retrieved from the Arch Linux Mirror Status API. The default is %(default)s.'
)
parser.add_argument(
@@ -575,15 +682,15 @@ def add_arguments(parser):
help='Save the mirrorlist to the given path.'
)
- sort_help = '; '.join('"{}": {}'.format(k, v) for k, v in MirrorStatus.SORT_TYPES.items())
+ sort_help = '; '.join('"{}": {}'.format(k, v) for k, v in SORT_TYPES.items())
parser.add_argument(
- '--sort', choices=MirrorStatus.SORT_TYPES,
+ '--sort', choices=SORT_TYPES,
help='Sort the mirrorlist. {}.'.format(sort_help)
)
parser.add_argument(
- '--threads', type=int, metavar='n',
- help='The number of threads to use when rating mirrors.'
+ '--threads', type=int, metavar='n', default=DEFAULT_N_THREADS,
+ help='The maximum number of threads to use when rating mirrors. Default: %(default)s'
)
parser.add_argument(
@@ -653,6 +760,21 @@ def add_arguments(parser):
help='Set the minimum completion percent for the returned mirrors. Check the mirrorstatus webpage for the meaning of this parameter. Default value: %(default)s.'
)
+ filters.add_argument(
+ '--isos', action='store_true',
+ help='Only return mirrors that host ISOs.'
+ )
+
+ filters.add_argument(
+ '--ipv4', action='store_true',
+ help='Only return mirrors that support IPv4.'
+ )
+
+ filters.add_argument(
+ '--ipv6', action='store_true',
+ help='Only return mirrors that support IPv6.'
+ )
+
return parser
@@ -680,7 +802,6 @@ def process_options(options, ms=None, mirrors=None):
'''
if not ms:
ms = MirrorStatus(
- verbose=options.verbose,
connection_timeout=options.connection_timeout,
# download_timeout=options.download_timeout,
cache_timeout=options.cache_timeout,
@@ -698,20 +819,23 @@ def process_options(options, ms=None, mirrors=None):
include=options.include,
exclude=options.exclude,
age=options.age,
- protocols=options.protocols
+ protocols=options.protocols,
+ isos=options.isos,
+ ipv4=options.ipv4,
+ ipv6=options.ipv6
)
if options.latest and options.latest > 0:
mirrors = ms.sort(mirrors, by='age')
- mirrors = mirrors[:options.latest]
+ mirrors = itertools.islice(mirrors, options.latest)
if options.score and options.score > 0:
mirrors = ms.sort(mirrors, by='score')
- mirrors = mirrors[:options.score]
+ mirrors = itertools.islice(mirrors, options.score)
if options.fastest and options.fastest > 0:
mirrors = ms.sort(mirrors, by='rate')
- mirrors = mirrors[:options.fastest]
+ mirrors = itertools.islice(mirrors, options.fastest)
if options.sort and not (options.sort == 'rate' and options.fastest):
mirrors = ms.sort(mirrors, by=options.sort)
@@ -723,8 +847,6 @@ def process_options(options, ms=None, mirrors=None):
-
-
def main(args=None, configure_logging=False):
if args:
cmd = tuple(args)
@@ -733,17 +855,25 @@ def main(args=None, configure_logging=False):
options = parse_args(args)
+ # Configure logging.
+ logger = get_logger()
+
if configure_logging:
if options.verbose:
level = logging.INFO
else:
level = logging.WARNING
- logging.basicConfig(
- format='[{asctime:s}] {levelname:s}: {message:s}',
+
+ logger.setLevel(level)
+ ch = logging.StreamHandler()
+ formatter = logging.Formatter(
+ fmt='[{asctime:s}] {levelname:s}: {message:s}',
style='{',
- datefmt='%Y-%m-%d %H:%M:%S',
- level=level
+ datefmt='%Y-%m-%d %H:%M:%S'
)
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+
try:
ms, mirrors = process_options(options)