From c944bb0374256554698c3190b3470bfae70d21ba Mon Sep 17 00:00:00 2001 From: Erich Eckner Date: Thu, 21 Mar 2019 09:11:08 +0100 Subject: upstream version 2019 --- Reflector.py | 964 +++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 547 insertions(+), 417 deletions(-) (limited to 'Reflector.py') diff --git a/Reflector.py b/Reflector.py index b81f910..bcbb82f 100644 --- a/Reflector.py +++ b/Reflector.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright (C) 2012, 2013 Xyne +# Copyright (C) 2012-2019 Xyne # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -22,6 +22,7 @@ import datetime import errno import getpass import http.client +import itertools import json import logging import os @@ -37,10 +38,287 @@ import time import urllib.error import urllib.request +################################## Constants ################################### +NAME = 'Reflector' + +URL = 'https://www.archlinux.org/mirrors/status/json/' + +DISPLAY_TIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC' +PARSE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' +PARSE_TIME_FORMAT_WITH_USEC = '%Y-%m-%dT%H:%M:%S.%fZ' + +DB_SUBPATH = 'core/os/x86_64/core.db' + +MIRROR_URL_FORMAT = '{0}{1}/os/{2}' +MIRRORLIST_ENTRY_FORMAT = "Server = " + MIRROR_URL_FORMAT + "\n" + +DEFAULT_CONNECTION_TIMEOUT = 5 +DEFAULT_CACHE_TIMEOUT = 300 +DEFAULT_N_THREADS = 5 + +SORT_TYPES = { + 'age' : 'last server synchronization', + 'rate' : 'download rate', + 'country': 'server\'s location', + 'score' : 'MirrorStatus score', + 'delay' : 'MirrorStatus delay', +} + + +################################# IO Functions ################################# + +def get_cache_file(): + ''' + Get a nearly XDG-compliant cache directory. PyXDG is not used to avoid the + external dependency. It is not fully compliant because it omits the + application name, but the mirror status file can be reused by other + applications and this stores no other files. + ''' + base_name = 'mirrorstatus.json' + cache_dir = os.getenv('XDG_CACHE_HOME', default=os.path.expanduser('~/.cache')) + try: + os.makedirs(cache_dir, exist_ok=True) + # Raised by makedirs if permissions do not match umask + except FileExistsError: + pass + return os.path.join(cache_dir, base_name) + + + +def get_mirrorstatus( + connection_timeout=DEFAULT_CONNECTION_TIMEOUT, + cache_timeout=DEFAULT_CACHE_TIMEOUT +): + ''' + Retrieve the mirror status JSON object. The downloaded data will be cached + locally and re-used within the cache timeout period. Returns the object and + the local cache's modification time. + ''' + cache_path = get_cache_file() + try: + mtime = os.path.getmtime(cache_path) + invalid = (time.time() - mtime) > cache_timeout + except FileNotFoundError: + mtime = None + invalid = True + + try: + if invalid: + with urllib.request.urlopen(URL, None, connection_timeout) as h: + obj = json.loads(h.read().decode()) + with open(cache_path, 'w') as h: + json.dump(obj, h, sort_keys=True, indent=2) + mtime = time.time() + else: + with open(cache_path, 'r') as h: + obj = json.load(h) + + return obj, mtime + except (IOError, urllib.error.URLError, socket.timeout) as e: + raise MirrorStatusError(str(e)) + + + +################################ Miscellaneous ################################# + +def get_logger(): + ''' + Get the logger used by this module. Use this to be sure that the right logger + is used. + ''' + return logging.getLogger(NAME) + + + +def format_last_sync(mirrors): + ''' + Parse and format the "last_sync" field. + ''' + for m in mirrors: + last_sync = calendar.timegm(time.strptime(m['last_sync'], PARSE_TIME_FORMAT)) + m.update(last_sync=last_sync) + yield m + + + +def count_countries(mirrors): + ''' + Count the mirrors in each country. + ''' + countries = dict() + for m in mirrors: + k = (m['country'], m['country_code']) + if not any(k): + continue + try: + countries[k] += 1 + except KeyError: + countries[k] = 1 + return countries + + + +################################### Sorting #################################### + +def sort(mirrors, by=None, n_threads=DEFAULT_N_THREADS): + ''' + Sort mirrors by different criteria. + ''' + # Ensure that "mirrors" is a list that can be sorted. + if not isinstance(mirrors, list): + mirrors = list(mirrors) + + if by == 'age': + mirrors.sort(key=lambda m: m['last_sync'], reverse=True) + + elif by == 'rate': + rates = rate(mirrors, n_threads=n_threads) + mirrors = sorted(mirrors, key=lambda m: rates[m['url']], reverse=True) + + else: + try: + mirrors.sort(key=lambda m: m[by]) + except KeyError: + raise MirrorStatusError('attempted to sort mirrors by unrecognized criterion: "{}"'.format(by)) + + return mirrors + + +#################################### Rating #################################### + +def rate_rsync(db_url, connection_timeout=DEFAULT_CONNECTION_TIMEOUT): + ''' + Download a database via rsync and return the time and rate of the download. + ''' + rsync_cmd = [ + 'rsync', + '-avL', '--no-h', '--no-motd', + '--contimeout={}'.format(connection_timeout), + db_url + ] + try: + with tempfile.TemporaryDirectory() as tmpdir: + t0 = time.time() + subprocess.check_call( + rsync_cmd + [tmpdir], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + dt = time.time() - t0 + size = os.path.getsize( + os.path.join(tmpdir, os.path.basename(DB_SUBPATH)) + ) + r = size / dt + return dt, r + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): + return None, 0 + + + +def rate_http(db_url, connection_timeout=DEFAULT_CONNECTION_TIMEOUT): + ''' + Download a database via any protocol supported by urlopen and return the time + and rate of the download. + ''' + req = urllib.request.Request(url=db_url) + try: + with urllib.request.urlopen(req, None, connection_timeout) as f: + t0 = time.time() + size = len(f.read()) + dt = time.time() - t0 + r = size / (dt) + return dt, r + except (OSError, urllib.error.HTTPError, http.client.HTTPException): + return None, 0 + + + +def rate(mirrors, n_threads=DEFAULT_N_THREADS, connection_timeout=DEFAULT_CONNECTION_TIMEOUT): + ''' + Rate mirrors by timing the download the core repo's database for each one. + ''' + # Ensure that mirrors is not a generator so that its length can be determined. + if not isinstance(mirrors, tuple): + mirrors = tuple(mirrors) + + if not mirrors: + return None + + # At least 1 thread and not more than the number of mirrors. + n_threads = max(1, min(n_threads, len(mirrors))) + + # URL input queue. + q_in = queue.Queue() + # URL, elapsed time and rate output queue. + q_out = queue.Queue() + + + def worker(): + while True: + # To stop a thread, an integer will be inserted in the input queue. Each + # thread will increment it and re-insert it until it equals the + # threadcount. After encountering the integer, the thread exits the loop. + url = q_in.get() + + if isinstance(url, int): + if url < n_threads: + q_in.put(url + 1) + + else: + db_url = url + DB_SUBPATH + scheme = urllib.parse.urlparse(url).scheme + + if scheme == 'rsync': + dt, r = rate_rsync(db_url, connection_timeout) + else: + dt, r = rate_http(db_url, connection_timeout) + + q_out.put((url, dt, r)) + + q_in.task_done() + + + workers = tuple(threading.Thread(target=worker) for _ in range(n_threads)) + for w in workers: + w.daemon = True + w.start() + + url_len = max(len(m['url']) for m in mirrors) + logger = get_logger() + for m in mirrors: + url = m['url'] + logger.info("rating {}".format(url)) + q_in.put(url) + + # To exit the threads. + q_in.put(0) + q_in.join() + + header_fmt = '{{:{:d}s}} {{:>14s}} {{:>9s}}'.format(url_len) + logger.info(header_fmt.format('Server', 'Rate', 'Time')) + fmt = '{{:{:d}s}} {{:8.2f}} KiB/s {{:7.2f}} s'.format(url_len) + + # Loop over the mirrors just to ensure that we get the rate for each mirror. + # The value in the loop does not (necessarily) correspond to the mirror. + rates = dict() + for _ in mirrors: + url, dt, r = q_out.get() + kibps = r / 1024.0 + logger.info(fmt.format(url, kibps, dt)) + rates[url] = r + q_out.task_done() + + return rates + + + +############################## MirrorStatusError ############################### -# Generic MirrorStatus Exception class MirrorStatusError(Exception): + ''' + Common base exception raised by this module. + ''' def __init__(self, msg): self.msg = msg def __str__(self): @@ -48,44 +326,168 @@ class MirrorStatusError(Exception): -def get_cache_file(): - bname = 'mirrorstatus.json' - path = os.getenv('XDG_CACHE_HOME') - if path: - try: - os.makedirs(path, exist_ok=True) - # Raised if permissions do not match umask - except FileExistsError: - pass - return os.path.join(path, bname) + +############################## MirrorStatusFilter ############################## + +class MirrorStatusFilter(): + + def __init__( + self, + min_completion_pct=1.0, + countries=None, + protocols=None, + include=None, + exclude=None, + age=None, + isos=False, + ipv4=False, + ipv6=False + ): + self.min_completion_pct = min_completion_pct + self.countries = tuple(c.upper() for c in countries) if countries else None + self.protocols = protocols + self.include = tuple(re.compile(r) for r in include) if include else None + self.exclude = tuple(re.compile(r) for r in exclude) if exclude else None + self.age = age + self.isos = isos + self.ipv4 = ipv4 + self.ipv6 = ipv6 + + + def filter_mirrors(self, mirrors): + # Filter unsynced mirrors. + mirrors = (m for m in mirrors if m['last_sync']) + + # Parse the last sync time. + mirrors = format_last_sync(mirrors) + + # Filter by completion "percent" [0-1]. + mirrors = (m for m in mirrors if m['completion_pct'] >= self.min_completion_pct) + + # Filter by countries. + if self.countries: + mirrors = ( + m for m in mirrors + if m['country'].upper() in self.countries + or m['country_code'].upper() in self.countries + ) + + # Filter by protocols. + if self.protocols: + mirrors = (m for m in mirrors if m['protocol'] in self.protocols) + + # Filter by include expressions. + if self.include: + mirrors = (m for m in mirrors if any(r.search(m['url']) for r in self.include)) + + # Filter by exclude expressions. + if self.exclude: + mirrors = (m for m in mirrors if not any(r.search(m['url']) for r in self.exclude)) + + # Filter by age. The age is given in hours and converted to seconds. Servers + # with a last refresh older than the age are omitted. + if self.age and self.age > 0: + t = time.time() + a = self.age * 60**2 + mirrors = (m for m in mirrors if (m['last_sync'] + a) >= t) + + # The following does not work. Only the final iteration affects "mirrors". + # TODO: Understand exactly why the code above works but the loop doesn't. + # for field in ('isos', 'ipv4', 'ipv6'): + # if getattr(self, field): + # mirrors = (m for m in mirrors if m[field]) + + # Filter by ISO hosing. + if self.isos: + mirrors = (m for m in mirrors if m['isos']) + + # Filter by IPv4 support. + if self.ipv4: + mirrors = (m for m in mirrors if m['ipv4']) + + # Filter by IPv6 support. + if self.ipv6: + mirrors = (m for m in mirrors if m['ipv6']) + + yield from mirrors + + + +################################## Formatting ################################## + +def format_mirrorlist(mirror_status, mtime, include_country=False, command=None): + if command is None: + command = '?' else: - return '/tmp/.{}.{}'.format(getpass.getuser(), bname) - - - -class MirrorStatus(object): - # JSON URI - URL = 'https://www.archlinux.org/mirrors/status/json/' - # Mirror URL format. Accepts server base URL, repository, and architecture. - MIRROR_URL_FORMAT = '{0}{1}/os/{2}' - MIRRORLIST_ENTRY_FORMAT = "Server = " + MIRROR_URL_FORMAT + "\n" - DISPLAY_TIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC' - PARSE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' - # Required for the last_check field, which oddly includes microseconds. - PARSE_TIME_FORMAT_WITH_USEC = '%Y-%m-%dT%H:%M:%S.%fZ' - # Recognized list sort types and their descriptions. - SORT_TYPES = { - 'age' : 'last server synchronization', - 'rate' : 'download rate', - 'country': 'server\'s location', - 'score' : 'MirrorStatus score', - 'delay' : 'MirrorStatus delay', - } - # Known repositories, i.e. those that should be on each mirror. - # Used to replace the "$repo" variable. - # TODO - # Avoid using a hard-coded list. - # See https://bugs.archlinux.org/task/32895 + command = 'reflector ' + ' '.join(pipes.quote(x) for x in command) + + last_check = mirror_status['last_check'] + # For some reason the "last_check" field included microseconds. + try: + parsed_last_check = datetime.datetime.strptime( + last_check, + PARSE_TIME_FORMAT_WITH_USEC, + ).timetuple() + except ValueError: + parsed_last_check = datetime.datetime.strptime( + last_check, + PARSE_TIME_FORMAT, + ).timetuple() + + width = 80 + colw = 11 + header = '# Arch Linux mirrorlist generated by Reflector #'.center(width, '#') + border = '#' * len(header) + mirrorlist = '' + mirrorlist = '{}\n{}\n{}\n'.format(border, header, border) + \ + '\n' + \ + '\n'.join( + '# {{:<{:d}s}} {{}}'.format(colw).format(k, v) for k, v in ( + ('With:', command), + ('When:', time.strftime(DISPLAY_TIME_FORMAT, time.gmtime())), + ('From:', URL), + ('Retrieved:', time.strftime(DISPLAY_TIME_FORMAT, time.gmtime(mtime))), + ('Last Check:', time.strftime(DISPLAY_TIME_FORMAT, parsed_last_check)) + ) + ) + \ + '\n\n' + + country = None + + mirrors = mirror_status['urls'] + for mirror in mirrors: + # Include country tags. This is intended for lists that are sorted by + # country. + if include_country: + c = '{} [{}]'.format(mirror['country'], mirror['country_code']) + if c != country: + if country: + mirrorlist += '\n' + mirrorlist += '# {}\n'.format(c) + country = c + mirrorlist += MIRRORLIST_ENTRY_FORMAT.format(mirror['url'], '$repo', '$arch') + + if mirrors: + return mirrorlist + else: + return None + + + + + +############################ MirrorStatus Retriever ############################ + +class MirrorStatus(): + ''' + This is a legacy class that will likely be removed in the future. It + previously held most of this module's functionality until it was refactored + into more modular functions. Seemingly pointless code is still used by + importers of this module. + ''' + + # TODO: move these to another module or remove them completely + # Related: https://bugs.archlinux.org/task/32895 REPOSITORIES = ( 'community', 'community-staging', @@ -99,325 +501,85 @@ class MirrorStatus(object): 'staging', 'testing' ) - - # Known system architectures, as used to replace the "$arch" variable. + # Officially supported system architectures. ARCHITECTURES = ['x86_64'] - # Initialize - # refresh_interval: - # The cached list will be replaced after this many seconds have passed. - # 0 effectively disables caching. - # Caching is only useful if the object persists, e.g. if it were embedded - # in a server. + MIRROR_URL_FORMAT = MIRROR_URL_FORMAT + MIRRORLIST_ENTRY_FORMAT = MIRRORLIST_ENTRY_FORMAT + + def __init__( self, - refresh_interval=0, - verbose=False, - connection_timeout=5, -# download_timeout=None, - cache_timeout=300, - min_completion_pct=1., - threads=5 + connection_timeout=DEFAULT_CONNECTION_TIMEOUT, + cache_timeout=DEFAULT_CACHE_TIMEOUT, + min_completion_pct=1.0, + threads=DEFAULT_N_THREADS ): - self.refresh_interval = refresh_interval - - # Last modification time of the json object. - self.json_mtime = 0 - # The parsed JSON object. - self.json_obj = {} - # Display extra information. - self.verbose = verbose - # Connection timeout self.connection_timeout = connection_timeout - # Download timeout -# self.download_timeout = download_timeout - # Cache timeout self.cache_timeout = cache_timeout - # Minimum completion percent, for filtering mirrors. self.min_completion_pct = min_completion_pct - # Threads self.threads = threads + self.mirror_status = None + self.ms_mtime = 0 def retrieve(self): - """Retrieve the current mirror status JSON data.""" - self.json_obj = None - json_str = None - save_json = False - - cache_file = get_cache_file() - if self.cache_timeout > 0: - save_json = True - try: - mtime = os.path.getmtime(cache_file) - if time.time() - mtime < self.cache_timeout: - try: - with open(cache_file) as f: - self.json_obj = json.load(f) - self.json_mtime = mtime - save_json = False - except IOError as e: - raise MirrorStatusError('failed to load cached JSON data ({})'.format(e)) - except OSError as e: - if e.errno != errno.ENOENT: - raise MirrorStatusError('failed to get cache file mtime ({})'.format(e)) - - if not self.json_obj: - try: - with urllib.request.urlopen(MirrorStatus.URL, None, self.connection_timeout) as f: - json_str = f.read() - self.json_obj = json.loads(json_str.decode()) - self.json_mtime = time.time() - except (urllib.error.URLError, socket.timeout) as e: - raise MirrorStatusError('failed to retrieve mirror data: ({})'.format(e)) - except ValueError as e: - raise MirrorStatusError('failed to parse retrieved mirror data: ({})'.format(e)) - - try: - # Remove servers that have not synced, and parse the "last_sync" times for - # comparison later. - mirrors = self.json_obj['urls'] - # Filter incomplete mirrors and mirrors that haven't synced. - mirrors = list( - m for m in mirrors - if m['last_sync'] - and m['completion_pct'] >= self.min_completion_pct - ) - # Parse 'last_sync' times for future comparison. - for mirror in mirrors: - mirror['last_sync'] = calendar.timegm( - time.strptime(mirror['last_sync'], - MirrorStatus.PARSE_TIME_FORMAT) - ) - self.json_obj['urls'] = mirrors - except KeyError: - raise MirrorStatusError('failed to parse retrieved mirror data (the format may have changed or there may be a transient error)') - - if save_json and json_str: - try: - with open(cache_file, 'wb') as f: - f.write(json_str) - except IOError as e: - raise MirrorStatusError('failed to cache JSON data ({})'.format(e)) - - - + self.mirror_status, self.ms_mtime = get_mirrorstatus( + connection_timeout=self.connection_timeout, + cache_timeout=self.cache_timeout + ) def get_obj(self): - """Return the JSON object, retrieving new data if necessary.""" - if not self.json_obj \ - or time.time() > (self.json_mtime + self.refresh_interval): + ''' + Get the JSON mirror status. + ''' + t = time.time() + if (t - self.ms_mtime) > self.cache_timeout: self.retrieve() - - return self.json_obj + return self.mirror_status def get_mirrors(self): - """Get the mirrors.""" - return self.get_obj()['urls'] - + ''' + Get the mirror from the mirror status. + ''' + obj = self.get_obj() + try: + return obj['urls'] + except KeyError: + raise MirrorStatusError('no mirrors detected in mirror status output') - def filter( - self, - mirrors=None, - countries=None, - regexes=None, # TODO: remove - include=None, - exclude=None, - age=None, - protocols=None - ): - """Filter using different parameters.""" - # TODO: remove - if regexes: -# raise MirrorStatusError('The "regexes" keyword has been deprecated and replaced by "include" and "exclude".') - if not include: - include = regexes - sys.stderr.write('''WARNING: The "regexes" keyword has been deprecated and replaced by "include" and "exclude". - Support will be soon removed without further warning.''') - if mirrors is None: - mirrors = self.get_mirrors() - t = time.time() - n = 0 - - # Make country arguments case-insensitive. - uc_countries = tuple(c.upper() for c in countries) if countries else None - for mirror in mirrors: - # Filter by country. - if countries \ - and not ( \ - mirror['country'].upper() in uc_countries or \ - mirror['country_code'].upper() in uc_countries \ - ): - continue - # Filter by protocol. - if protocols and not mirror['protocol'] in protocols: - continue - # Filter by regex. - # TODO: Find a better way to do this. - if include: - for regex in include: - if re.search(regex, mirror['url']): - break - else: - continue - if exclude: - discard = False - for regex in exclude: - if re.search(regex, mirror['url']): - discard = True - break - if discard: - continue - # Filter by hours since last sync. - if age and t > (age * 60**2 + mirror['last_sync']): - continue - - # Yield if we're still here. - yield mirror - - - - def sort(self, mirrors=None, by=None): - """Sort using different parameters.""" + def filter(self, mirrors=None, **kwargs): + ''' + Filter mirrors by various criteria. + ''' if mirrors is None: mirrors = self.get_mirrors() - # Ensure that "mirrors" is a list that can be sorted. - if not isinstance(mirrors, list): - mirrors = list(mirrors) + msf = MirrorStatusFilter(min_completion_pct=self.min_completion_pct, **kwargs) + yield from msf.filter_mirrors(mirrors) - if by == 'age': - mirrors.sort(key=lambda m: m['last_sync'], reverse=True) - elif by == 'rate': - mirrors = self.rate(mirrors) - elif by in ('country', 'country_code', 'delay', 'score'): - mirrors.sort(key=lambda m: m[by]) - return mirrors - - # Sort mirrors by download speed. Download speed will be calculated from the - # download time of the [core] database from each server. - # TODO: Consider ways to improve this. - # TODO: Consider the effects of threading (do the threads affect the results - # by competing for bandwidth?) - def rate(self, mirrors=None, threads=5): + def sort(self, mirrors=None, **kwargs): + ''' + Sort mirrors by various criteria. + ''' if mirrors is None: mirrors = self.get_mirrors() - if not threads: - threads = self.threads - # Ensure that "mirrors" is a list and not a generator. - if not isinstance(mirrors, list): - mirrors = list(mirrors) - - if not mirrors: - logging.warning('no mirrors selected for rating') - return mirrors - - # Ensure a sane number of threads. - if threads < 1: - threads = 1 - else: - threads = min(threads, len(mirrors)) - - rates = {} - - # URL input queue.Queue - q_in = queue.Queue() - # URL and rate output queue.Queue - q_out = queue.Queue() - def worker(): - while True: - url = q_in.get() - db_subpath = 'core/os/x86_64/core.db' - db_url = url + db_subpath - scheme = urllib.parse.urlparse(url).scheme - # Leave the rate as 0 if the connection fails. - # TODO: Consider more graceful error handling. - rate = 0 - dt = float('NaN') - - # urllib cannot handle rsync protocol - if scheme == 'rsync': - rsync_cmd = [ - 'rsync', - '-avL', '--no-h', '--no-motd', - '--contimeout={}'.format(self.connection_timeout), - db_url - ] - try: - with tempfile.TemporaryDirectory() as tmpdir: - t0 = time.time() - subprocess.check_call( - rsync_cmd + [tmpdir], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL - ) - dt = time.time() - t0 - size = os.path.getsize(os.path.join( - tmpdir, - os.path.basename(db_subpath) - )) - rate = size / dt - except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): - pass - else: - req = urllib.request.Request(url=db_url) - try: - t0 = time.time() - with urllib.request.urlopen(req, None, self.connection_timeout) as f: - size = len(f.read()) - dt = time.time() - t0 - rate = size / (dt) - except (OSError, urllib.error.HTTPError, http.client.HTTPException): - pass - q_out.put((url, rate, dt)) - q_in.task_done() - - # Launch threads - for i in range(threads): - t = threading.Thread(target=worker) - t.daemon = True - t.start() - - - # Load the input queue.Queue - url_len = max(len(m['url']) for m in mirrors) - for mirror in mirrors: - logging.info("rating {}".format(mirror['url'])) - q_in.put(mirror['url']) - - q_in.join() - - - # Get the results - # The "in mirrors" loop is just used to ensure that the right number of - # items is retrieved. + yield from sort(mirrors, n_threads=self.threads, **kwargs) - # Display some extra data. - header_fmt = '{{:{:d}s}} {{:>14s}} {{:>9s}}'.format(url_len) - logging.info(header_fmt.format('Server', 'Rate', 'Time')) - fmt = '{{:{:d}s}} {{:8.2f}} KiB/s {{:7.2f}} s'.format(url_len) - # Loop over the mirrors just to ensure that we get the rate for each mirror. - # The value in the loop does not (necessarily) correspond to the mirror. - for _ in mirrors: - url, rate, dt = q_out.get() - kibps = rate / 1024.0 - logging.info(fmt.format(url, kibps, dt)) - rates[url] = rate - q_out.task_done() - - - # Sort by rate. - rated_mirrors = [m for m in mirrors if rates[m['url']] > 0] - rated_mirrors.sort(key=lambda m: rates[m['url']], reverse=True) - - return rated_mirrors + [m for m in mirrors if rates[m['url']] == 0] + def rate(self, mirrors=None, **kwargs): + ''' + Sort mirrors by download speed. + ''' + if mirrors is None: + mirrors = self.get_mirrors() + yield from sort(mirrors, n_threads=self.threads, by='rate', **kwargs) @@ -427,85 +589,29 @@ class MirrorStatus(object): - - # Return a Pacman-formatted mirrorlist - # TODO: Reconsider the assumption that self.json_obj has been retrieved. def get_mirrorlist(self, mirrors=None, include_country=False, cmd=None): - if mirrors is None: - mirrors = self.get_mirrors() - if cmd is None: - cmd = '?' - else: - cmd = 'reflector ' + ' '.join(pipes.quote(x) for x in cmd) - - last_check = self.json_obj['last_check'] - # For some reason the "last_check" field included microseconds. - try: - parsed_last_check = datetime.datetime.strptime( - last_check, - self.PARSE_TIME_FORMAT_WITH_USEC, - ).timetuple() - except ValueError: - parsed_last_check = datetime.datetime.strptime( - last_check, - self.PARSE_TIME_FORMAT, - ).timetuple() - - width = 80 - colw = 11 - header = '# Arch Linux mirrorlist generated by Reflector #'.center(width, '#') - border = '#' * len(header) - mirrorlist = '{}\n{}\n{}\n'.format(border, header, border) + \ - '\n' + \ - '\n'.join( - '# {{:<{:d}s}} {{}}'.format(colw).format(k, v) for k, v in ( - ('With:', cmd), - ('When:', self.display_time(time.gmtime())), - ('From:', MirrorStatus.URL), - ('Retrieved:', self.display_time(time.gmtime(self.json_mtime))), - ('Last Check:', self.display_time(parsed_last_check)), - ) - ) + \ - '\n\n' - - country = None - - # mirrors may be a generator so "if mirrors" will not work - no_mirrors = True - for mirror in mirrors: - no_mirrors = False - # Include country tags. This is intended for lists that are sorted by - # country. - if include_country: - c = '{} [{}]'.format(mirror['country'], mirror['country_code']) - if c != country: - if country: - mirrorlist += '\n' - mirrorlist += '# {}\n'.format(c) - country = c - mirrorlist += MirrorStatus.MIRRORLIST_ENTRY_FORMAT.format(mirror['url'], '$repo', '$arch') - - if no_mirrors: - return None - else: - return mirrorlist + ''' + Get a Pacman-formatted mirrorlist. + ''' + obj = self.get_obj().copy() + if mirrors is not None: + if not isinstance(mirrors, list): + mirrors = list(mirrors) + obj['urls'] = mirrors + return format_mirrorlist(obj, self.ms_mtime, include_country=include_country, command=cmd) def list_countries(self): - countries = dict() - for m in self.get_mirrors(): - k = (m['country'], m['country_code']) - try: - countries[k] += 1 - except KeyError: - countries[k] = 1 - return countries - - + ''' + List countries along with a server count for each one. + ''' + mirrors = self.get_mirrors() + return count_countries(mirrors) +############################### argparse Actions ############################### class ListCountries(argparse.Action): ''' @@ -523,11 +629,12 @@ class ListCountries(argparse.Action): -def print_mirror_info(mirrors, time_fmt=MirrorStatus.DISPLAY_TIME_FORMAT): +def print_mirror_info(mirrors, time_fmt=DISPLAY_TIME_FORMAT): ''' Print information about each mirror to STDOUT. ''' if mirrors: + # mirrors = format_last_sync(mirrors) if not isinstance(mirrors, list): mirrors = list(mirrors) ks = sorted(k for k in mirrors[0].keys() if k != 'url') @@ -551,8 +658,8 @@ def add_arguments(parser): parser = argparse.ArgumentParser(description='retrieve and filter a list of the latest Arch Linux mirrors') parser.add_argument( - '--connection-timeout', type=int, metavar='n', default=5, - help='The number of seconds to wait before a connection times out.' + '--connection-timeout', type=int, metavar='n', default=DEFAULT_CONNECTION_TIMEOUT, + help='The number of seconds to wait before a connection times out. Default: %(default)s' ) # parser.add_argument( @@ -566,8 +673,8 @@ def add_arguments(parser): ) parser.add_argument( - '--cache-timeout', type=int, metavar='n', default=300, - help='The cache timeout in seconds for the data retrieved from the Arch Linux Mirror Status API. The default is 300 (5 minutes).' + '--cache-timeout', type=int, metavar='n', default=DEFAULT_CACHE_TIMEOUT, + help='The cache timeout in seconds for the data retrieved from the Arch Linux Mirror Status API. The default is %(default)s.' ) parser.add_argument( @@ -575,15 +682,15 @@ def add_arguments(parser): help='Save the mirrorlist to the given path.' ) - sort_help = '; '.join('"{}": {}'.format(k, v) for k, v in MirrorStatus.SORT_TYPES.items()) + sort_help = '; '.join('"{}": {}'.format(k, v) for k, v in SORT_TYPES.items()) parser.add_argument( - '--sort', choices=MirrorStatus.SORT_TYPES, + '--sort', choices=SORT_TYPES, help='Sort the mirrorlist. {}.'.format(sort_help) ) parser.add_argument( - '--threads', type=int, metavar='n', - help='The number of threads to use when rating mirrors.' + '--threads', type=int, metavar='n', default=DEFAULT_N_THREADS, + help='The maximum number of threads to use when rating mirrors. Default: %(default)s' ) parser.add_argument( @@ -653,6 +760,21 @@ def add_arguments(parser): help='Set the minimum completion percent for the returned mirrors. Check the mirrorstatus webpage for the meaning of this parameter. Default value: %(default)s.' ) + filters.add_argument( + '--isos', action='store_true', + help='Only return mirrors that host ISOs.' + ) + + filters.add_argument( + '--ipv4', action='store_true', + help='Only return mirrors that support IPv4.' + ) + + filters.add_argument( + '--ipv6', action='store_true', + help='Only return mirrors that support IPv6.' + ) + return parser @@ -680,7 +802,6 @@ def process_options(options, ms=None, mirrors=None): ''' if not ms: ms = MirrorStatus( - verbose=options.verbose, connection_timeout=options.connection_timeout, # download_timeout=options.download_timeout, cache_timeout=options.cache_timeout, @@ -698,20 +819,23 @@ def process_options(options, ms=None, mirrors=None): include=options.include, exclude=options.exclude, age=options.age, - protocols=options.protocols + protocols=options.protocols, + isos=options.isos, + ipv4=options.ipv4, + ipv6=options.ipv6 ) if options.latest and options.latest > 0: mirrors = ms.sort(mirrors, by='age') - mirrors = mirrors[:options.latest] + mirrors = itertools.islice(mirrors, options.latest) if options.score and options.score > 0: mirrors = ms.sort(mirrors, by='score') - mirrors = mirrors[:options.score] + mirrors = itertools.islice(mirrors, options.score) if options.fastest and options.fastest > 0: mirrors = ms.sort(mirrors, by='rate') - mirrors = mirrors[:options.fastest] + mirrors = itertools.islice(mirrors, options.fastest) if options.sort and not (options.sort == 'rate' and options.fastest): mirrors = ms.sort(mirrors, by=options.sort) @@ -723,8 +847,6 @@ def process_options(options, ms=None, mirrors=None): - - def main(args=None, configure_logging=False): if args: cmd = tuple(args) @@ -733,17 +855,25 @@ def main(args=None, configure_logging=False): options = parse_args(args) + # Configure logging. + logger = get_logger() + if configure_logging: if options.verbose: level = logging.INFO else: level = logging.WARNING - logging.basicConfig( - format='[{asctime:s}] {levelname:s}: {message:s}', + + logger.setLevel(level) + ch = logging.StreamHandler() + formatter = logging.Formatter( + fmt='[{asctime:s}] {levelname:s}: {message:s}', style='{', - datefmt='%Y-%m-%d %H:%M:%S', - level=level + datefmt='%Y-%m-%d %H:%M:%S' ) + ch.setFormatter(formatter) + logger.addHandler(ch) + try: ms, mirrors = process_options(options) -- cgit v1.2.3