lib64/python3.8/wsgiref/__init__.py 0000644 00000001113 15173037777 0013032 0 ustar 00 """wsgiref -- a WSGI (PEP 3333) Reference Library
Current Contents:
* util -- Miscellaneous useful functions and wrappers
* headers -- Manage response headers
* handlers -- base classes for server/gateway implementations
* simple_server -- a simple BaseHTTPServer that supports WSGI
* validate -- validation wrapper that sits between an app and a server
to detect errors in either
To-Do:
* cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard)
* cgi_wrapper -- Run CGI apps under WSGI
* router -- a simple middleware component that handles URL traversal
"""
lib/python3.6/site-packages/pip/utils/__init__.py 0000644 00000066155 15173070155 0015664 0 ustar 00 from __future__ import absolute_import
from collections import deque
import contextlib
import errno
import io
import locale
# we have a submodule named 'logging' which would shadow this if we used the
# regular name:
import logging as std_logging
import re
import os
import posixpath
import shutil
import stat
import subprocess
import sys
import tarfile
import zipfile
from pip.exceptions import InstallationError
from pip.compat import console_to_str, expanduser, stdlib_pkgs
from pip.locations import (
site_packages, user_site, running_under_virtualenv, virtualenv_no_global,
write_delete_marker_file, distutils_scheme,
)
from pip._vendor import pkg_resources
from pip._vendor.six.moves import input
from pip._vendor.six import PY2
from pip._vendor.retrying import retry
if PY2:
from io import BytesIO as StringIO
else:
from io import StringIO
__all__ = ['rmtree', 'display_path', 'backup_dir',
'ask', 'splitext',
'format_size', 'is_installable_dir',
'is_svn_page', 'file_contents',
'split_leading_dir', 'has_leading_dir',
'normalize_path',
'renames', 'get_terminal_size', 'get_prog',
'unzip_file', 'untar_file', 'unpack_file', 'call_subprocess',
'captured_stdout', 'ensure_dir',
'ARCHIVE_EXTENSIONS', 'SUPPORTED_EXTENSIONS',
'get_installed_version']
logger = std_logging.getLogger(__name__)
BZ2_EXTENSIONS = ('.tar.bz2', '.tbz')
XZ_EXTENSIONS = ('.tar.xz', '.txz', '.tlz', '.tar.lz', '.tar.lzma')
ZIP_EXTENSIONS = ('.zip', '.whl')
TAR_EXTENSIONS = ('.tar.gz', '.tgz', '.tar')
ARCHIVE_EXTENSIONS = (
ZIP_EXTENSIONS + BZ2_EXTENSIONS + TAR_EXTENSIONS + XZ_EXTENSIONS)
SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
try:
import bz2 # noqa
SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
except ImportError:
logger.debug('bz2 module is not available')
try:
# Only for Python 3.3+
import lzma # noqa
SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
except ImportError:
logger.debug('lzma module is not available')
def import_or_raise(pkg_or_module_string, ExceptionType, *args, **kwargs):
try:
return __import__(pkg_or_module_string)
except ImportError:
raise ExceptionType(*args, **kwargs)
def ensure_dir(path):
"""os.path.makedirs without EEXIST."""
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def get_prog():
try:
if os.path.basename(sys.argv[0]) in ('__main__.py', '-c'):
return "%s -m pip" % sys.executable
except (AttributeError, TypeError, IndexError):
pass
return 'pip'
# Retry every half second for up to 3 seconds
@retry(stop_max_delay=3000, wait_fixed=500)
def rmtree(dir, ignore_errors=False):
shutil.rmtree(dir, ignore_errors=ignore_errors,
onerror=rmtree_errorhandler)
def rmtree_errorhandler(func, path, exc_info):
"""On Windows, the files in .svn are read-only, so when rmtree() tries to
remove them, an exception is thrown. We catch that here, remove the
read-only attribute, and hopefully continue without problems."""
# if file type currently read only
if os.stat(path).st_mode & stat.S_IREAD:
# convert to read/write
os.chmod(path, stat.S_IWRITE)
# use the original function to repeat the operation
func(path)
return
else:
raise
def display_path(path):
"""Gives the display value for a given path, making it relative to cwd
if possible."""
path = os.path.normcase(os.path.abspath(path))
if sys.version_info[0] == 2:
path = path.decode(sys.getfilesystemencoding(), 'replace')
path = path.encode(sys.getdefaultencoding(), 'replace')
if path.startswith(os.getcwd() + os.path.sep):
path = '.' + path[len(os.getcwd()):]
return path
def backup_dir(dir, ext='.bak'):
"""Figure out the name of a directory to back up the given dir to
(adding .bak, .bak2, etc)"""
n = 1
extension = ext
while os.path.exists(dir + extension):
n += 1
extension = ext + str(n)
return dir + extension
def ask_path_exists(message, options):
for action in os.environ.get('PIP_EXISTS_ACTION', '').split():
if action in options:
return action
return ask(message, options)
def ask(message, options):
"""Ask the message interactively, with the given possible responses"""
while 1:
if os.environ.get('PIP_NO_INPUT'):
raise Exception(
'No input was expected ($PIP_NO_INPUT set); question: %s' %
message
)
response = input(message)
response = response.strip().lower()
if response not in options:
print(
'Your response (%r) was not one of the expected responses: '
'%s' % (response, ', '.join(options))
)
else:
return response
def format_size(bytes):
if bytes > 1000 * 1000:
return '%.1fMB' % (bytes / 1000.0 / 1000)
elif bytes > 10 * 1000:
return '%ikB' % (bytes / 1000)
elif bytes > 1000:
return '%.1fkB' % (bytes / 1000.0)
else:
return '%ibytes' % bytes
def is_installable_dir(path):
"""Return True if `path` is a directory containing a setup.py file."""
if not os.path.isdir(path):
return False
setup_py = os.path.join(path, 'setup.py')
if os.path.isfile(setup_py):
return True
return False
def is_svn_page(html):
"""
Returns true if the page appears to be the index page of an svn repository
"""
return (re.search(r'<title>[^<]*Revision \d+:', html) and
re.search(r'Powered by (?:<a[^>]*?>)?Subversion', html, re.I))
def file_contents(filename):
with open(filename, 'rb') as fp:
return fp.read().decode('utf-8')
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
"""Yield pieces of data from a file-like object until EOF."""
while True:
chunk = file.read(size)
if not chunk:
break
yield chunk
def split_leading_dir(path):
path = path.lstrip('/').lstrip('\\')
if '/' in path and (('\\' in path and path.find('/') < path.find('\\')) or
'\\' not in path):
return path.split('/', 1)
elif '\\' in path:
return path.split('\\', 1)
else:
return path, ''
def has_leading_dir(paths):
"""Returns true if all the paths have the same leading path name
(i.e., everything is in one subdirectory in an archive)"""
common_prefix = None
for path in paths:
prefix, rest = split_leading_dir(path)
if not prefix:
return False
elif common_prefix is None:
common_prefix = prefix
elif prefix != common_prefix:
return False
return True
def normalize_path(path, resolve_symlinks=True):
"""
Convert a path to its canonical, case-normalized, absolute version.
"""
path = expanduser(path)
if resolve_symlinks:
path = os.path.realpath(path)
else:
path = os.path.abspath(path)
return os.path.normcase(path)
def splitext(path):
"""Like os.path.splitext, but take off .tar too"""
base, ext = posixpath.splitext(path)
if base.lower().endswith('.tar'):
ext = base[-4:] + ext
base = base[:-4]
return base, ext
def renames(old, new):
"""Like os.renames(), but handles renaming across devices."""
# Implementation borrowed from os.renames().
head, tail = os.path.split(new)
if head and tail and not os.path.exists(head):
os.makedirs(head)
shutil.move(old, new)
head, tail = os.path.split(old)
if head and tail:
try:
os.removedirs(head)
except OSError:
pass
def is_local(path):
"""
Return True if path is within sys.prefix, if we're running in a virtualenv.
If we're not in a virtualenv, all paths are considered "local."
"""
if not running_under_virtualenv():
return True
return normalize_path(path).startswith(normalize_path(sys.prefix))
def dist_is_local(dist):
"""
Return True if given Distribution object is installed locally
(i.e. within current virtualenv).
Always True if we're not in a virtualenv.
"""
return is_local(dist_location(dist))
def dist_in_usersite(dist):
"""
Return True if given Distribution is installed in user site.
"""
norm_path = normalize_path(dist_location(dist))
return norm_path.startswith(normalize_path(user_site))
def dist_in_site_packages(dist):
"""
Return True if given Distribution is installed in
distutils.sysconfig.get_python_lib().
"""
return normalize_path(
dist_location(dist)
).startswith(normalize_path(site_packages))
def dist_in_install_path(dist):
"""
Return True if given Distribution is installed in
path matching distutils_scheme layout.
"""
norm_path = normalize_path(dist_location(dist))
return norm_path.startswith(normalize_path(
distutils_scheme("")['purelib'].split('python')[0]))
def dist_is_editable(dist):
"""Is distribution an editable install?"""
for path_item in sys.path:
egg_link = os.path.join(path_item, dist.project_name + '.egg-link')
if os.path.isfile(egg_link):
return True
return False
def get_installed_distributions(local_only=True,
skip=stdlib_pkgs,
include_editables=True,
editables_only=False,
user_only=False):
"""
Return a list of installed Distribution objects.
If ``local_only`` is True (default), only return installations
local to the current virtualenv, if in a virtualenv.
``skip`` argument is an iterable of lower-case project names to
ignore; defaults to stdlib_pkgs
If ``editables`` is False, don't report editables.
If ``editables_only`` is True , only report editables.
If ``user_only`` is True , only report installations in the user
site directory.
"""
if local_only:
local_test = dist_is_local
else:
def local_test(d):
return True
if include_editables:
def editable_test(d):
return True
else:
def editable_test(d):
return not dist_is_editable(d)
if editables_only:
def editables_only_test(d):
return dist_is_editable(d)
else:
def editables_only_test(d):
return True
if user_only:
user_test = dist_in_usersite
else:
def user_test(d):
return True
return [d for d in pkg_resources.working_set
if local_test(d) and
d.key not in skip and
editable_test(d) and
editables_only_test(d) and
user_test(d)
]
def egg_link_path(dist):
"""
Return the path for the .egg-link file if it exists, otherwise, None.
There's 3 scenarios:
1) not in a virtualenv
try to find in site.USER_SITE, then site_packages
2) in a no-global virtualenv
try to find in site_packages
3) in a yes-global virtualenv
try to find in site_packages, then site.USER_SITE
(don't look in global location)
For #1 and #3, there could be odd cases, where there's an egg-link in 2
locations.
This method will just return the first one found.
"""
sites = []
if running_under_virtualenv():
if virtualenv_no_global():
sites.append(site_packages)
else:
sites.append(site_packages)
if user_site:
sites.append(user_site)
else:
if user_site:
sites.append(user_site)
sites.append(site_packages)
for site in sites:
egglink = os.path.join(site, dist.project_name) + '.egg-link'
if os.path.isfile(egglink):
return egglink
def dist_location(dist):
"""
Get the site-packages location of this distribution. Generally
this is dist.location, except in the case of develop-installed
packages, where dist.location is the source code location, and we
want to know where the egg-link file is.
"""
egg_link = egg_link_path(dist)
if egg_link:
return egg_link
return dist.location
def get_terminal_size():
"""Returns a tuple (x, y) representing the width(x) and the height(x)
in characters of the terminal window."""
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack(
'hh',
fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
)
except:
return None
if cr == (0, 0):
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
return int(cr[1]), int(cr[0])
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def untar_file(filename, location):
"""
Untar the file (with path `filename`) to the destination `location`.
All files are written based on system defaults and umask (i.e. permissions
are not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'):
mode = 'r:gz'
elif filename.lower().endswith(BZ2_EXTENSIONS):
mode = 'r:bz2'
elif filename.lower().endswith(XZ_EXTENSIONS):
mode = 'r:xz'
elif filename.lower().endswith('.tar'):
mode = 'r'
else:
logger.warning(
'Cannot determine compression type for file %s', filename,
)
mode = 'r:*'
tar = tarfile.open(filename, mode)
try:
# note: python<=2.5 doesn't seem to know about pax headers, filter them
leading = has_leading_dir([
member.name for member in tar.getmembers()
if member.name != 'pax_global_header'
])
for member in tar.getmembers():
fn = member.name
if fn == 'pax_global_header':
continue
if leading:
fn = split_leading_dir(fn)[1]
path = os.path.join(location, fn)
# Call the `data` filter for its side effect (raising exception)
try:
tarfile.data_filter(member.replace(name=fn), location)
except tarfile.LinkOutsideDestinationError:
pass
if member.isdir():
ensure_dir(path)
elif member.issym():
try:
tar._extract_member(member, path)
except Exception as exc:
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
logger.warning(
'In the tar file %s the member %s is invalid: %s',
filename, member.name, exc,
)
continue
else:
try:
fp = tar.extractfile(member)
except (KeyError, AttributeError) as exc:
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
logger.warning(
'In the tar file %s the member %s is invalid: %s',
filename, member.name, exc,
)
continue
ensure_dir(os.path.dirname(path))
with open(path, 'wb') as destfp:
shutil.copyfileobj(fp, destfp)
fp.close()
# Update the timestamp (useful for cython compiled files)
tar.utime(member, path)
# member have any execute permissions for user/group/world?
if member.mode & 0o111:
# make dest file have execute for user/group/world
# no-op on windows per python docs
os.chmod(path, (0o777 - current_umask() | 0o111))
finally:
tar.close()
def unpack_file(filename, location, content_type, link):
filename = os.path.realpath(filename)
if (content_type == 'application/zip' or
filename.lower().endswith(ZIP_EXTENSIONS) or
zipfile.is_zipfile(filename)):
unzip_file(
filename,
location,
flatten=not filename.endswith('.whl')
)
elif (content_type == 'application/x-gzip' or
tarfile.is_tarfile(filename) or
filename.lower().endswith(
TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS)):
untar_file(filename, location)
elif (content_type and content_type.startswith('text/html') and
is_svn_page(file_contents(filename))):
# We don't really care about this
from pip.vcs.subversion import Subversion
Subversion('svn+' + link.url).unpack(location)
else:
# FIXME: handle?
# FIXME: magic signatures?
logger.critical(
'Cannot unpack file %s (downloaded from %s, content-type: %s); '
'cannot detect archive format',
filename, location, content_type,
)
raise InstallationError(
'Cannot determine archive format of %s' % location
)
def call_subprocess(cmd, show_stdout=True, cwd=None,
on_returncode='raise',
command_desc=None,
extra_environ=None, spinner=None):
# This function's handling of subprocess output is confusing and I
# previously broke it terribly, so as penance I will write a long comment
# explaining things.
#
# The obvious thing that affects output is the show_stdout=
# kwarg. show_stdout=True means, let the subprocess write directly to our
# stdout. Even though it is nominally the default, it is almost never used
# inside pip (and should not be used in new code without a very good
# reason); as of 2016-02-22 it is only used in a few places inside the VCS
# wrapper code. Ideally we should get rid of it entirely, because it
# creates a lot of complexity here for a rarely used feature.
#
# Most places in pip set show_stdout=False. What this means is:
# - We connect the child stdout to a pipe, which we read.
# - By default, we hide the output but show a spinner -- unless the
# subprocess exits with an error, in which case we show the output.
# - If the --verbose option was passed (= loglevel is DEBUG), then we show
# the output unconditionally. (But in this case we don't want to show
# the output a second time if it turns out that there was an error.)
#
# stderr is always merged with stdout (even if show_stdout=True).
if show_stdout:
stdout = None
else:
stdout = subprocess.PIPE
if command_desc is None:
cmd_parts = []
for part in cmd:
if ' ' in part or '\n' in part or '"' in part or "'" in part:
part = '"%s"' % part.replace('"', '\\"')
cmd_parts.append(part)
command_desc = ' '.join(cmd_parts)
logger.debug("Running command %s", command_desc)
env = os.environ.copy()
if extra_environ:
env.update(extra_environ)
try:
proc = subprocess.Popen(
cmd, stderr=subprocess.STDOUT, stdin=None, stdout=stdout,
cwd=cwd, env=env)
except Exception as exc:
logger.critical(
"Error %s while executing command %s", exc, command_desc,
)
raise
if stdout is not None:
all_output = []
while True:
line = console_to_str(proc.stdout.readline())
if not line:
break
line = line.rstrip()
all_output.append(line + '\n')
if logger.getEffectiveLevel() <= std_logging.DEBUG:
# Show the line immediately
logger.debug(line)
else:
# Update the spinner
if spinner is not None:
spinner.spin()
proc.wait()
if spinner is not None:
if proc.returncode:
spinner.finish("error")
else:
spinner.finish("done")
if proc.returncode:
if on_returncode == 'raise':
if (logger.getEffectiveLevel() > std_logging.DEBUG and
not show_stdout):
logger.info(
'Complete output from command %s:', command_desc,
)
logger.info(
''.join(all_output) +
'\n----------------------------------------'
)
raise InstallationError(
'Command "%s" failed with error code %s in %s'
% (command_desc, proc.returncode, cwd))
elif on_returncode == 'warn':
logger.warning(
'Command "%s" had error code %s in %s',
command_desc, proc.returncode, cwd,
)
elif on_returncode == 'ignore':
pass
else:
raise ValueError('Invalid value: on_returncode=%s' %
repr(on_returncode))
if not show_stdout:
return ''.join(all_output)
def read_text_file(filename):
"""Return the contents of *filename*.
Try to decode the file contents with utf-8, the preferred system encoding
(e.g., cp1252 on some Windows machines), and latin1, in that order.
Decoding a byte string with latin1 will never raise an error. In the worst
case, the returned string will contain some garbage characters.
"""
with open(filename, 'rb') as fp:
data = fp.read()
encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
for enc in encodings:
try:
data = data.decode(enc)
except UnicodeDecodeError:
continue
break
assert type(data) != bytes # Latin1 should have worked.
return data
def _make_build_dir(build_dir):
os.makedirs(build_dir)
write_delete_marker_file(build_dir)
class FakeFile(object):
"""Wrap a list of lines in an object with readline() to make
ConfigParser happy."""
def __init__(self, lines):
self._gen = (l for l in lines)
def readline(self):
try:
try:
return next(self._gen)
except NameError:
return self._gen.next()
except StopIteration:
return ''
def __iter__(self):
return self._gen
class StreamWrapper(StringIO):
@classmethod
def from_stream(cls, orig_stream):
cls.orig_stream = orig_stream
return cls()
# compileall.compile_dir() needs stdout.encoding to print to stdout
@property
def encoding(self):
return self.orig_stream.encoding
@contextlib.contextmanager
def captured_output(stream_name):
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO.
Taken from Lib/support/__init__.py in the CPython repo.
"""
orig_stdout = getattr(sys, stream_name)
setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
try:
yield getattr(sys, stream_name)
finally:
setattr(sys, stream_name, orig_stdout)
def captured_stdout():
"""Capture the output of sys.stdout:
with captured_stdout() as stdout:
print('hello')
self.assertEqual(stdout.getvalue(), 'hello\n')
Taken from Lib/support/__init__.py in the CPython repo.
"""
return captured_output('stdout')
class cached_property(object):
"""A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
property.
Source: https://github.com/bottlepy/bottle/blob/0.11.5/bottle.py#L175
"""
def __init__(self, func):
self.__doc__ = getattr(func, '__doc__')
self.func = func
def __get__(self, obj, cls):
if obj is None:
# We're being accessed from the class itself, not from an object
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
def get_installed_version(dist_name, lookup_dirs=None):
"""Get the installed version of dist_name avoiding pkg_resources cache"""
# Create a requirement that we'll look for inside of setuptools.
req = pkg_resources.Requirement.parse(dist_name)
# We want to avoid having this cached, so we need to construct a new
# working set each time.
if lookup_dirs is None:
working_set = pkg_resources.WorkingSet()
else:
working_set = pkg_resources.WorkingSet(lookup_dirs)
# Get the installed distribution from our working set
dist = working_set.find(req)
# Check to see if we got an installed distribution or not, if we did
# we want to return it's version.
return dist.version if dist else None
def consume(iterator):
"""Consume an iterable at C speed."""
deque(iterator, maxlen=0)
lib64/python3.6/html/__init__.py 0000644 00000011224 15173074707 0012324 0 ustar 00 """
General functions for HTML manipulation.
"""
import re as _re
from html.entities import html5 as _html5
__all__ = ['escape', 'unescape']
def escape(s, quote=True):
"""
Replace special characters "&", "<" and ">" to HTML-safe sequences.
If the optional flag quote is true (the default), the quotation mark
characters, both double quote (") and single quote (') characters are also
translated.
"""
s = s.replace("&", "&") # Must be done first!
s = s.replace("<", "<")
s = s.replace(">", ">")
if quote:
s = s.replace('"', """)
s = s.replace('\'', "'")
return s
# see http://www.w3.org/TR/html5/syntax.html#tokenizing-character-references
_invalid_charrefs = {
0x00: '\ufffd', # REPLACEMENT CHARACTER
0x0d: '\r', # CARRIAGE RETURN
0x80: '\u20ac', # EURO SIGN
0x81: '\x81', # <control>
0x82: '\u201a', # SINGLE LOW-9 QUOTATION MARK
0x83: '\u0192', # LATIN SMALL LETTER F WITH HOOK
0x84: '\u201e', # DOUBLE LOW-9 QUOTATION MARK
0x85: '\u2026', # HORIZONTAL ELLIPSIS
0x86: '\u2020', # DAGGER
0x87: '\u2021', # DOUBLE DAGGER
0x88: '\u02c6', # MODIFIER LETTER CIRCUMFLEX ACCENT
0x89: '\u2030', # PER MILLE SIGN
0x8a: '\u0160', # LATIN CAPITAL LETTER S WITH CARON
0x8b: '\u2039', # SINGLE LEFT-POINTING ANGLE QUOTATION MARK
0x8c: '\u0152', # LATIN CAPITAL LIGATURE OE
0x8d: '\x8d', # <control>
0x8e: '\u017d', # LATIN CAPITAL LETTER Z WITH CARON
0x8f: '\x8f', # <control>
0x90: '\x90', # <control>
0x91: '\u2018', # LEFT SINGLE QUOTATION MARK
0x92: '\u2019', # RIGHT SINGLE QUOTATION MARK
0x93: '\u201c', # LEFT DOUBLE QUOTATION MARK
0x94: '\u201d', # RIGHT DOUBLE QUOTATION MARK
0x95: '\u2022', # BULLET
0x96: '\u2013', # EN DASH
0x97: '\u2014', # EM DASH
0x98: '\u02dc', # SMALL TILDE
0x99: '\u2122', # TRADE MARK SIGN
0x9a: '\u0161', # LATIN SMALL LETTER S WITH CARON
0x9b: '\u203a', # SINGLE RIGHT-POINTING ANGLE QUOTATION MARK
0x9c: '\u0153', # LATIN SMALL LIGATURE OE
0x9d: '\x9d', # <control>
0x9e: '\u017e', # LATIN SMALL LETTER Z WITH CARON
0x9f: '\u0178', # LATIN CAPITAL LETTER Y WITH DIAERESIS
}
_invalid_codepoints = {
# 0x0001 to 0x0008
0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8,
# 0x000E to 0x001F
0xe, 0xf, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19,
0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f,
# 0x007F to 0x009F
0x7f, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85, 0x86, 0x87, 0x88, 0x89, 0x8a,
0x8b, 0x8c, 0x8d, 0x8e, 0x8f, 0x90, 0x91, 0x92, 0x93, 0x94, 0x95, 0x96,
0x97, 0x98, 0x99, 0x9a, 0x9b, 0x9c, 0x9d, 0x9e, 0x9f,
# 0xFDD0 to 0xFDEF
0xfdd0, 0xfdd1, 0xfdd2, 0xfdd3, 0xfdd4, 0xfdd5, 0xfdd6, 0xfdd7, 0xfdd8,
0xfdd9, 0xfdda, 0xfddb, 0xfddc, 0xfddd, 0xfdde, 0xfddf, 0xfde0, 0xfde1,
0xfde2, 0xfde3, 0xfde4, 0xfde5, 0xfde6, 0xfde7, 0xfde8, 0xfde9, 0xfdea,
0xfdeb, 0xfdec, 0xfded, 0xfdee, 0xfdef,
# others
0xb, 0xfffe, 0xffff, 0x1fffe, 0x1ffff, 0x2fffe, 0x2ffff, 0x3fffe, 0x3ffff,
0x4fffe, 0x4ffff, 0x5fffe, 0x5ffff, 0x6fffe, 0x6ffff, 0x7fffe, 0x7ffff,
0x8fffe, 0x8ffff, 0x9fffe, 0x9ffff, 0xafffe, 0xaffff, 0xbfffe, 0xbffff,
0xcfffe, 0xcffff, 0xdfffe, 0xdffff, 0xefffe, 0xeffff, 0xffffe, 0xfffff,
0x10fffe, 0x10ffff
}
def _replace_charref(s):
s = s.group(1)
if s[0] == '#':
# numeric charref
if s[1] in 'xX':
num = int(s[2:].rstrip(';'), 16)
else:
num = int(s[1:].rstrip(';'))
if num in _invalid_charrefs:
return _invalid_charrefs[num]
if 0xD800 <= num <= 0xDFFF or num > 0x10FFFF:
return '\uFFFD'
if num in _invalid_codepoints:
return ''
return chr(num)
else:
# named charref
if s in _html5:
return _html5[s]
# find the longest matching name (as defined by the standard)
for x in range(len(s)-1, 1, -1):
if s[:x] in _html5:
return _html5[s[:x]] + s[x:]
else:
return '&' + s
_charref = _re.compile(r'&(#[0-9]+;?'
r'|#[xX][0-9a-fA-F]+;?'
r'|[^\t\n\f <&#;]{1,32};?)')
def unescape(s):
"""
Convert all named and numeric character references (e.g. >, >,
&x3e;) in the string s to the corresponding unicode characters.
This function uses the rules defined by the HTML 5 standard
for both valid and invalid character references, and the list of
HTML 5 named character references defined in html.entities.html5.
"""
if '&' not in s:
return s
return _charref.sub(_replace_charref, s)
lib/python2.7/site-packages/pkg_resources/__init__.py 0000644 00000312177 15173104514 0016602 0 ustar 00 # coding: utf-8
"""
Package resource API
--------------------
A resource is a logical file contained within a package, or a logical
subdirectory thereof. The package resource API expects resource names
to have their path parts separated with ``/``, *not* whatever the local
path separator is. Do not use os.path operations to manipulate resource
names being passed into the API.
The package resource API is designed to work with normal filesystem packages,
.egg files, and unpacked .egg files. It can also work in a limited way with
.zip files and with custom PEP 302 loaders that support the ``get_data()``
method.
"""
from __future__ import absolute_import
import sys
import os
import io
import time
import re
import types
import zipfile
import zipimport
import warnings
import stat
import functools
import pkgutil
import operator
import platform
import collections
import plistlib
import email.parser
import errno
import tempfile
import textwrap
import itertools
import inspect
from pkgutil import get_importer
try:
import _imp
except ImportError:
# Python 3.2 compatibility
import imp as _imp
from pkg_resources.extern import six
from pkg_resources.extern.six.moves import urllib, map, filter
# capture these to bypass sandboxing
from os import utime
try:
from os import mkdir, rename, unlink
WRITE_SUPPORT = True
except ImportError:
# no write support, probably under GAE
WRITE_SUPPORT = False
from os import open as os_open
from os.path import isdir, split
try:
import importlib.machinery as importlib_machinery
# access attribute to force import under delayed import mechanisms.
importlib_machinery.__name__
except ImportError:
importlib_machinery = None
from . import py31compat
from pkg_resources.extern import appdirs
from pkg_resources.extern import packaging
__import__('pkg_resources.extern.packaging.version')
__import__('pkg_resources.extern.packaging.specifiers')
__import__('pkg_resources.extern.packaging.requirements')
__import__('pkg_resources.extern.packaging.markers')
if (3, 0) < sys.version_info < (3, 3):
raise RuntimeError("Python 3.3 or later is required")
if six.PY2:
# Those builtin exceptions are only defined in Python 3
PermissionError = None
NotADirectoryError = None
# declare some globals that will be defined later to
# satisfy the linters.
require = None
working_set = None
add_activation_listener = None
resources_stream = None
cleanup_resources = None
resource_dir = None
resource_stream = None
set_extraction_path = None
resource_isdir = None
resource_string = None
iter_entry_points = None
resource_listdir = None
resource_filename = None
resource_exists = None
_distribution_finders = None
_namespace_handlers = None
_namespace_packages = None
class PEP440Warning(RuntimeWarning):
"""
Used when there is an issue with a version or specifier not complying with
PEP 440.
"""
def parse_version(v):
try:
return packaging.version.Version(v)
except packaging.version.InvalidVersion:
return packaging.version.LegacyVersion(v)
_state_vars = {}
def _declare_state(vartype, **kw):
globals().update(kw)
_state_vars.update(dict.fromkeys(kw, vartype))
def __getstate__():
state = {}
g = globals()
for k, v in _state_vars.items():
state[k] = g['_sget_' + v](g[k])
return state
def __setstate__(state):
g = globals()
for k, v in state.items():
g['_sset_' + _state_vars[k]](k, g[k], v)
return state
def _sget_dict(val):
return val.copy()
def _sset_dict(key, ob, state):
ob.clear()
ob.update(state)
def _sget_object(val):
return val.__getstate__()
def _sset_object(key, ob, state):
ob.__setstate__(state)
_sget_none = _sset_none = lambda *args: None
def get_supported_platform():
"""Return this platform's maximum compatible version.
distutils.util.get_platform() normally reports the minimum version
of Mac OS X that would be required to *use* extensions produced by
distutils. But what we want when checking compatibility is to know the
version of Mac OS X that we are *running*. To allow usage of packages that
explicitly require a newer version of Mac OS X, we must also know the
current version of the OS.
If this condition occurs for any other platform with a version in its
platform strings, this function should be extended accordingly.
"""
plat = get_build_platform()
m = macosVersionString.match(plat)
if m is not None and sys.platform == "darwin":
try:
plat = 'macosx-%s-%s' % ('.'.join(_macosx_vers()[:2]), m.group(3))
except ValueError:
# not Mac OS X
pass
return plat
__all__ = [
# Basic resource access and distribution/entry point discovery
'require', 'run_script', 'get_provider', 'get_distribution',
'load_entry_point', 'get_entry_map', 'get_entry_info',
'iter_entry_points',
'resource_string', 'resource_stream', 'resource_filename',
'resource_listdir', 'resource_exists', 'resource_isdir',
# Environmental control
'declare_namespace', 'working_set', 'add_activation_listener',
'find_distributions', 'set_extraction_path', 'cleanup_resources',
'get_default_cache',
# Primary implementation classes
'Environment', 'WorkingSet', 'ResourceManager',
'Distribution', 'Requirement', 'EntryPoint',
# Exceptions
'ResolutionError', 'VersionConflict', 'DistributionNotFound',
'UnknownExtra', 'ExtractionError',
# Warnings
'PEP440Warning',
# Parsing functions and string utilities
'parse_requirements', 'parse_version', 'safe_name', 'safe_version',
'get_platform', 'compatible_platforms', 'yield_lines', 'split_sections',
'safe_extra', 'to_filename', 'invalid_marker', 'evaluate_marker',
# filesystem utilities
'ensure_directory', 'normalize_path',
# Distribution "precedence" constants
'EGG_DIST', 'BINARY_DIST', 'SOURCE_DIST', 'CHECKOUT_DIST', 'DEVELOP_DIST',
# "Provider" interfaces, implementations, and registration/lookup APIs
'IMetadataProvider', 'IResourceProvider', 'FileMetadata',
'PathMetadata', 'EggMetadata', 'EmptyProvider', 'empty_provider',
'NullProvider', 'EggProvider', 'DefaultProvider', 'ZipProvider',
'register_finder', 'register_namespace_handler', 'register_loader_type',
'fixup_namespace_packages', 'get_importer',
# Deprecated/backward compatibility only
'run_main', 'AvailableDistributions',
]
class ResolutionError(Exception):
"""Abstract base for dependency resolution errors"""
def __repr__(self):
return self.__class__.__name__ + repr(self.args)
class VersionConflict(ResolutionError):
"""
An already-installed version conflicts with the requested version.
Should be initialized with the installed Distribution and the requested
Requirement.
"""
_template = "{self.dist} is installed but {self.req} is required"
@property
def dist(self):
return self.args[0]
@property
def req(self):
return self.args[1]
def report(self):
return self._template.format(**locals())
def with_context(self, required_by):
"""
If required_by is non-empty, return a version of self that is a
ContextualVersionConflict.
"""
if not required_by:
return self
args = self.args + (required_by,)
return ContextualVersionConflict(*args)
class ContextualVersionConflict(VersionConflict):
"""
A VersionConflict that accepts a third parameter, the set of the
requirements that required the installed Distribution.
"""
_template = VersionConflict._template + ' by {self.required_by}'
@property
def required_by(self):
return self.args[2]
class DistributionNotFound(ResolutionError):
"""A requested distribution was not found"""
_template = ("The '{self.req}' distribution was not found "
"and is required by {self.requirers_str}")
@property
def req(self):
return self.args[0]
@property
def requirers(self):
return self.args[1]
@property
def requirers_str(self):
if not self.requirers:
return 'the application'
return ', '.join(self.requirers)
def report(self):
return self._template.format(**locals())
def __str__(self):
return self.report()
class UnknownExtra(ResolutionError):
"""Distribution doesn't have an "extra feature" of the given name"""
_provider_factories = {}
PY_MAJOR = sys.version[:3]
EGG_DIST = 3
BINARY_DIST = 2
SOURCE_DIST = 1
CHECKOUT_DIST = 0
DEVELOP_DIST = -1
def register_loader_type(loader_type, provider_factory):
"""Register `provider_factory` to make providers for `loader_type`
`loader_type` is the type or class of a PEP 302 ``module.__loader__``,
and `provider_factory` is a function that, passed a *module* object,
returns an ``IResourceProvider`` for that module.
"""
_provider_factories[loader_type] = provider_factory
def get_provider(moduleOrReq):
"""Return an IResourceProvider for the named module or requirement"""
if isinstance(moduleOrReq, Requirement):
return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0]
try:
module = sys.modules[moduleOrReq]
except KeyError:
__import__(moduleOrReq)
module = sys.modules[moduleOrReq]
loader = getattr(module, '__loader__', None)
return _find_adapter(_provider_factories, loader)(module)
def _macosx_vers(_cache=[]):
if not _cache:
version = platform.mac_ver()[0]
# fallback for MacPorts
if version == '':
plist = '/System/Library/CoreServices/SystemVersion.plist'
if os.path.exists(plist):
if hasattr(plistlib, 'readPlist'):
plist_content = plistlib.readPlist(plist)
if 'ProductVersion' in plist_content:
version = plist_content['ProductVersion']
_cache.append(version.split('.'))
return _cache[0]
def _macosx_arch(machine):
return {'PowerPC': 'ppc', 'Power_Macintosh': 'ppc'}.get(machine, machine)
def get_build_platform():
"""Return this platform's string for platform-specific distributions
XXX Currently this is the same as ``distutils.util.get_platform()``, but it
needs some hacks for Linux and Mac OS X.
"""
try:
# Python 2.7 or >=3.2
from sysconfig import get_platform
except ImportError:
from distutils.util import get_platform
plat = get_platform()
if sys.platform == "darwin" and not plat.startswith('macosx-'):
try:
version = _macosx_vers()
machine = os.uname()[4].replace(" ", "_")
return "macosx-%d.%d-%s" % (
int(version[0]), int(version[1]),
_macosx_arch(machine),
)
except ValueError:
# if someone is running a non-Mac darwin system, this will fall
# through to the default implementation
pass
return plat
macosVersionString = re.compile(r"macosx-(\d+)\.(\d+)-(.*)")
darwinVersionString = re.compile(r"darwin-(\d+)\.(\d+)\.(\d+)-(.*)")
# XXX backward compat
get_platform = get_build_platform
def compatible_platforms(provided, required):
"""Can code for the `provided` platform run on the `required` platform?
Returns true if either platform is ``None``, or the platforms are equal.
XXX Needs compatibility checks for Linux and other unixy OSes.
"""
if provided is None or required is None or provided == required:
# easy case
return True
# Mac OS X special cases
reqMac = macosVersionString.match(required)
if reqMac:
provMac = macosVersionString.match(provided)
# is this a Mac package?
if not provMac:
# this is backwards compatibility for packages built before
# setuptools 0.6. All packages built after this point will
# use the new macosx designation.
provDarwin = darwinVersionString.match(provided)
if provDarwin:
dversion = int(provDarwin.group(1))
macosversion = "%s.%s" % (reqMac.group(1), reqMac.group(2))
if dversion == 7 and macosversion >= "10.3" or \
dversion == 8 and macosversion >= "10.4":
return True
# egg isn't macosx or legacy darwin
return False
# are they the same major version and machine type?
if provMac.group(1) != reqMac.group(1) or \
provMac.group(3) != reqMac.group(3):
return False
# is the required OS major update >= the provided one?
if int(provMac.group(2)) > int(reqMac.group(2)):
return False
return True
# XXX Linux and other platforms' special cases should go here
return False
def run_script(dist_spec, script_name):
"""Locate distribution `dist_spec` and run its `script_name` script"""
ns = sys._getframe(1).f_globals
name = ns['__name__']
ns.clear()
ns['__name__'] = name
require(dist_spec)[0].run_script(script_name, ns)
# backward compatibility
run_main = run_script
def get_distribution(dist):
"""Return a current distribution object for a Requirement or string"""
if isinstance(dist, six.string_types):
dist = Requirement.parse(dist)
if isinstance(dist, Requirement):
dist = get_provider(dist)
if not isinstance(dist, Distribution):
raise TypeError("Expected string, Requirement, or Distribution", dist)
return dist
def load_entry_point(dist, group, name):
"""Return `name` entry point of `group` for `dist` or raise ImportError"""
return get_distribution(dist).load_entry_point(group, name)
def get_entry_map(dist, group=None):
"""Return the entry point map for `group`, or the full entry map"""
return get_distribution(dist).get_entry_map(group)
def get_entry_info(dist, group, name):
"""Return the EntryPoint object for `group`+`name`, or ``None``"""
return get_distribution(dist).get_entry_info(group, name)
class IMetadataProvider:
def has_metadata(name):
"""Does the package's distribution contain the named metadata?"""
def get_metadata(name):
"""The named metadata resource as a string"""
def get_metadata_lines(name):
"""Yield named metadata resource as list of non-blank non-comment lines
Leading and trailing whitespace is stripped from each line, and lines
with ``#`` as the first non-blank character are omitted."""
def metadata_isdir(name):
"""Is the named metadata a directory? (like ``os.path.isdir()``)"""
def metadata_listdir(name):
"""List of metadata names in the directory (like ``os.listdir()``)"""
def run_script(script_name, namespace):
"""Execute the named script in the supplied namespace dictionary"""
class IResourceProvider(IMetadataProvider):
"""An object that provides access to package resources"""
def get_resource_filename(manager, resource_name):
"""Return a true filesystem path for `resource_name`
`manager` must be an ``IResourceManager``"""
def get_resource_stream(manager, resource_name):
"""Return a readable file-like object for `resource_name`
`manager` must be an ``IResourceManager``"""
def get_resource_string(manager, resource_name):
"""Return a string containing the contents of `resource_name`
`manager` must be an ``IResourceManager``"""
def has_resource(resource_name):
"""Does the package contain the named resource?"""
def resource_isdir(resource_name):
"""Is the named resource a directory? (like ``os.path.isdir()``)"""
def resource_listdir(resource_name):
"""List of resource names in the directory (like ``os.listdir()``)"""
class WorkingSet(object):
"""A collection of active distributions on sys.path (or a similar list)"""
def __init__(self, entries=None):
"""Create working set from list of path entries (default=sys.path)"""
self.entries = []
self.entry_keys = {}
self.by_key = {}
self.callbacks = []
if entries is None:
entries = sys.path
for entry in entries:
self.add_entry(entry)
@classmethod
def _build_master(cls):
"""
Prepare the master working set.
"""
ws = cls()
try:
from __main__ import __requires__
except ImportError:
# The main program does not list any requirements
return ws
# ensure the requirements are met
try:
ws.require(__requires__)
except VersionConflict:
return cls._build_from_requirements(__requires__)
return ws
@classmethod
def _build_from_requirements(cls, req_spec):
"""
Build a working set from a requirement spec. Rewrites sys.path.
"""
# try it without defaults already on sys.path
# by starting with an empty path
ws = cls([])
reqs = parse_requirements(req_spec)
dists = ws.resolve(reqs, Environment())
for dist in dists:
ws.add(dist)
# add any missing entries from sys.path
for entry in sys.path:
if entry not in ws.entries:
ws.add_entry(entry)
# then copy back to sys.path
sys.path[:] = ws.entries
return ws
def add_entry(self, entry):
"""Add a path item to ``.entries``, finding any distributions on it
``find_distributions(entry, True)`` is used to find distributions
corresponding to the path entry, and they are added. `entry` is
always appended to ``.entries``, even if it is already present.
(This is because ``sys.path`` can contain the same value more than
once, and the ``.entries`` of the ``sys.path`` WorkingSet should always
equal ``sys.path``.)
"""
self.entry_keys.setdefault(entry, [])
self.entries.append(entry)
for dist in find_distributions(entry, True):
self.add(dist, entry, False)
def __contains__(self, dist):
"""True if `dist` is the active distribution for its project"""
return self.by_key.get(dist.key) == dist
def find(self, req):
"""Find a distribution matching requirement `req`
If there is an active distribution for the requested project, this
returns it as long as it meets the version requirement specified by
`req`. But, if there is an active distribution for the project and it
does *not* meet the `req` requirement, ``VersionConflict`` is raised.
If there is no active distribution for the requested project, ``None``
is returned.
"""
dist = self.by_key.get(req.key)
if dist is not None and dist not in req:
# XXX add more info
raise VersionConflict(dist, req)
return dist
def iter_entry_points(self, group, name=None):
"""Yield entry point objects from `group` matching `name`
If `name` is None, yields all entry points in `group` from all
distributions in the working set, otherwise only ones matching
both `group` and `name` are yielded (in distribution order).
"""
for dist in self:
entries = dist.get_entry_map(group)
if name is None:
for ep in entries.values():
yield ep
elif name in entries:
yield entries[name]
def run_script(self, requires, script_name):
"""Locate distribution for `requires` and run `script_name` script"""
ns = sys._getframe(1).f_globals
name = ns['__name__']
ns.clear()
ns['__name__'] = name
self.require(requires)[0].run_script(script_name, ns)
def __iter__(self):
"""Yield distributions for non-duplicate projects in the working set
The yield order is the order in which the items' path entries were
added to the working set.
"""
seen = {}
for item in self.entries:
if item not in self.entry_keys:
# workaround a cache issue
continue
for key in self.entry_keys[item]:
if key not in seen:
seen[key] = 1
yield self.by_key[key]
def add(self, dist, entry=None, insert=True, replace=False):
"""Add `dist` to working set, associated with `entry`
If `entry` is unspecified, it defaults to the ``.location`` of `dist`.
On exit from this routine, `entry` is added to the end of the working
set's ``.entries`` (if it wasn't already present).
`dist` is only added to the working set if it's for a project that
doesn't already have a distribution in the set, unless `replace=True`.
If it's added, any callbacks registered with the ``subscribe()`` method
will be called.
"""
if insert:
dist.insert_on(self.entries, entry, replace=replace)
if entry is None:
entry = dist.location
keys = self.entry_keys.setdefault(entry, [])
keys2 = self.entry_keys.setdefault(dist.location, [])
if not replace and dist.key in self.by_key:
# ignore hidden distros
return
self.by_key[dist.key] = dist
if dist.key not in keys:
keys.append(dist.key)
if dist.key not in keys2:
keys2.append(dist.key)
self._added_new(dist)
def resolve(self, requirements, env=None, installer=None,
replace_conflicting=False, extras=None):
"""List all distributions needed to (recursively) meet `requirements`
`requirements` must be a sequence of ``Requirement`` objects. `env`,
if supplied, should be an ``Environment`` instance. If
not supplied, it defaults to all distributions available within any
entry or distribution in the working set. `installer`, if supplied,
will be invoked with each requirement that cannot be met by an
already-installed distribution; it should return a ``Distribution`` or
``None``.
Unless `replace_conflicting=True`, raises a VersionConflict exception
if
any requirements are found on the path that have the correct name but
the wrong version. Otherwise, if an `installer` is supplied it will be
invoked to obtain the correct version of the requirement and activate
it.
`extras` is a list of the extras to be used with these requirements.
This is important because extra requirements may look like `my_req;
extra = "my_extra"`, which would otherwise be interpreted as a purely
optional requirement. Instead, we want to be able to assert that these
requirements are truly required.
"""
# set up the stack
requirements = list(requirements)[::-1]
# set of processed requirements
processed = {}
# key -> dist
best = {}
to_activate = []
req_extras = _ReqExtras()
# Mapping of requirement to set of distributions that required it;
# useful for reporting info about conflicts.
required_by = collections.defaultdict(set)
while requirements:
# process dependencies breadth-first
req = requirements.pop(0)
if req in processed:
# Ignore cyclic or redundant dependencies
continue
if not req_extras.markers_pass(req, extras):
continue
dist = best.get(req.key)
if dist is None:
# Find the best distribution and add it to the map
dist = self.by_key.get(req.key)
if dist is None or (dist not in req and replace_conflicting):
ws = self
if env is None:
if dist is None:
env = Environment(self.entries)
else:
# Use an empty environment and workingset to avoid
# any further conflicts with the conflicting
# distribution
env = Environment([])
ws = WorkingSet([])
dist = best[req.key] = env.best_match(
req, ws, installer,
replace_conflicting=replace_conflicting
)
if dist is None:
requirers = required_by.get(req, None)
raise DistributionNotFound(req, requirers)
to_activate.append(dist)
if dist not in req:
# Oops, the "best" so far conflicts with a dependency
dependent_req = required_by[req]
raise VersionConflict(dist, req).with_context(dependent_req)
# push the new requirements onto the stack
new_requirements = dist.requires(req.extras)[::-1]
requirements.extend(new_requirements)
# Register the new requirements needed by req
for new_requirement in new_requirements:
required_by[new_requirement].add(req.project_name)
req_extras[new_requirement] = req.extras
processed[req] = True
# return list of distros to activate
return to_activate
def find_plugins(
self, plugin_env, full_env=None, installer=None, fallback=True):
"""Find all activatable distributions in `plugin_env`
Example usage::
distributions, errors = working_set.find_plugins(
Environment(plugin_dirlist)
)
# add plugins+libs to sys.path
map(working_set.add, distributions)
# display errors
print('Could not load', errors)
The `plugin_env` should be an ``Environment`` instance that contains
only distributions that are in the project's "plugin directory" or
directories. The `full_env`, if supplied, should be an ``Environment``
contains all currently-available distributions. If `full_env` is not
supplied, one is created automatically from the ``WorkingSet`` this
method is called on, which will typically mean that every directory on
``sys.path`` will be scanned for distributions.
`installer` is a standard installer callback as used by the
``resolve()`` method. The `fallback` flag indicates whether we should
attempt to resolve older versions of a plugin if the newest version
cannot be resolved.
This method returns a 2-tuple: (`distributions`, `error_info`), where
`distributions` is a list of the distributions found in `plugin_env`
that were loadable, along with any other distributions that are needed
to resolve their dependencies. `error_info` is a dictionary mapping
unloadable plugin distributions to an exception instance describing the
error that occurred. Usually this will be a ``DistributionNotFound`` or
``VersionConflict`` instance.
"""
plugin_projects = list(plugin_env)
# scan project names in alphabetic order
plugin_projects.sort()
error_info = {}
distributions = {}
if full_env is None:
env = Environment(self.entries)
env += plugin_env
else:
env = full_env + plugin_env
shadow_set = self.__class__([])
# put all our entries in shadow_set
list(map(shadow_set.add, self))
for project_name in plugin_projects:
for dist in plugin_env[project_name]:
req = [dist.as_requirement()]
try:
resolvees = shadow_set.resolve(req, env, installer)
except ResolutionError as v:
# save error info
error_info[dist] = v
if fallback:
# try the next older version of project
continue
else:
# give up on this project, keep going
break
else:
list(map(shadow_set.add, resolvees))
distributions.update(dict.fromkeys(resolvees))
# success, no need to try any more versions of this project
break
distributions = list(distributions)
distributions.sort()
return distributions, error_info
def require(self, *requirements):
"""Ensure that distributions matching `requirements` are activated
`requirements` must be a string or a (possibly-nested) sequence
thereof, specifying the distributions and versions required. The
return value is a sequence of the distributions that needed to be
activated to fulfill the requirements; all relevant distributions are
included, even if they were already activated in this working set.
"""
needed = self.resolve(parse_requirements(requirements))
for dist in needed:
self.add(dist)
return needed
def subscribe(self, callback, existing=True):
"""Invoke `callback` for all distributions
If `existing=True` (default),
call on all existing ones, as well.
"""
if callback in self.callbacks:
return
self.callbacks.append(callback)
if not existing:
return
for dist in self:
callback(dist)
def _added_new(self, dist):
for callback in self.callbacks:
callback(dist)
def __getstate__(self):
return (
self.entries[:], self.entry_keys.copy(), self.by_key.copy(),
self.callbacks[:]
)
def __setstate__(self, e_k_b_c):
entries, keys, by_key, callbacks = e_k_b_c
self.entries = entries[:]
self.entry_keys = keys.copy()
self.by_key = by_key.copy()
self.callbacks = callbacks[:]
class _ReqExtras(dict):
"""
Map each requirement to the extras that demanded it.
"""
def markers_pass(self, req, extras=None):
"""
Evaluate markers for req against each extra that
demanded it.
Return False if the req has a marker and fails
evaluation. Otherwise, return True.
"""
extra_evals = (
req.marker.evaluate({'extra': extra})
for extra in self.get(req, ()) + (extras or (None,))
)
return not req.marker or any(extra_evals)
class Environment(object):
"""Searchable snapshot of distributions on a search path"""
def __init__(
self, search_path=None, platform=get_supported_platform(),
python=PY_MAJOR):
"""Snapshot distributions available on a search path
Any distributions found on `search_path` are added to the environment.
`search_path` should be a sequence of ``sys.path`` items. If not
supplied, ``sys.path`` is used.
`platform` is an optional string specifying the name of the platform
that platform-specific distributions must be compatible with. If
unspecified, it defaults to the current platform. `python` is an
optional string naming the desired version of Python (e.g. ``'3.3'``);
it defaults to the current version.
You may explicitly set `platform` (and/or `python`) to ``None`` if you
wish to map *all* distributions, not just those compatible with the
running platform or Python version.
"""
self._distmap = {}
self.platform = platform
self.python = python
self.scan(search_path)
def can_add(self, dist):
"""Is distribution `dist` acceptable for this environment?
The distribution must match the platform and python version
requirements specified when this environment was created, or False
is returned.
"""
py_compat = (
self.python is None
or dist.py_version is None
or dist.py_version == self.python
)
return py_compat and compatible_platforms(dist.platform, self.platform)
def remove(self, dist):
"""Remove `dist` from the environment"""
self._distmap[dist.key].remove(dist)
def scan(self, search_path=None):
"""Scan `search_path` for distributions usable in this environment
Any distributions found are added to the environment.
`search_path` should be a sequence of ``sys.path`` items. If not
supplied, ``sys.path`` is used. Only distributions conforming to
the platform/python version defined at initialization are added.
"""
if search_path is None:
search_path = sys.path
for item in search_path:
for dist in find_distributions(item):
self.add(dist)
def __getitem__(self, project_name):
"""Return a newest-to-oldest list of distributions for `project_name`
Uses case-insensitive `project_name` comparison, assuming all the
project's distributions use their project's name converted to all
lowercase as their key.
"""
distribution_key = project_name.lower()
return self._distmap.get(distribution_key, [])
def add(self, dist):
"""Add `dist` if we ``can_add()`` it and it has not already been added
"""
if self.can_add(dist) and dist.has_version():
dists = self._distmap.setdefault(dist.key, [])
if dist not in dists:
dists.append(dist)
dists.sort(key=operator.attrgetter('hashcmp'), reverse=True)
def best_match(
self, req, working_set, installer=None, replace_conflicting=False):
"""Find distribution best matching `req` and usable on `working_set`
This calls the ``find(req)`` method of the `working_set` to see if a
suitable distribution is already active. (This may raise
``VersionConflict`` if an unsuitable version of the project is already
active in the specified `working_set`.) If a suitable distribution
isn't active, this method returns the newest distribution in the
environment that meets the ``Requirement`` in `req`. If no suitable
distribution is found, and `installer` is supplied, then the result of
calling the environment's ``obtain(req, installer)`` method will be
returned.
"""
try:
dist = working_set.find(req)
except VersionConflict:
if not replace_conflicting:
raise
dist = None
if dist is not None:
return dist
for dist in self[req.key]:
if dist in req:
return dist
# try to download/install
return self.obtain(req, installer)
def obtain(self, requirement, installer=None):
"""Obtain a distribution matching `requirement` (e.g. via download)
Obtain a distro that matches requirement (e.g. via download). In the
base ``Environment`` class, this routine just returns
``installer(requirement)``, unless `installer` is None, in which case
None is returned instead. This method is a hook that allows subclasses
to attempt other ways of obtaining a distribution before falling back
to the `installer` argument."""
if installer is not None:
return installer(requirement)
def __iter__(self):
"""Yield the unique project names of the available distributions"""
for key in self._distmap.keys():
if self[key]:
yield key
def __iadd__(self, other):
"""In-place addition of a distribution or environment"""
if isinstance(other, Distribution):
self.add(other)
elif isinstance(other, Environment):
for project in other:
for dist in other[project]:
self.add(dist)
else:
raise TypeError("Can't add %r to environment" % (other,))
return self
def __add__(self, other):
"""Add an environment or distribution to an environment"""
new = self.__class__([], platform=None, python=None)
for env in self, other:
new += env
return new
# XXX backward compatibility
AvailableDistributions = Environment
class ExtractionError(RuntimeError):
"""An error occurred extracting a resource
The following attributes are available from instances of this exception:
manager
The resource manager that raised this exception
cache_path
The base directory for resource extraction
original_error
The exception instance that caused extraction to fail
"""
class ResourceManager:
"""Manage resource extraction and packages"""
extraction_path = None
def __init__(self):
self.cached_files = {}
def resource_exists(self, package_or_requirement, resource_name):
"""Does the named resource exist?"""
return get_provider(package_or_requirement).has_resource(resource_name)
def resource_isdir(self, package_or_requirement, resource_name):
"""Is the named resource an existing directory?"""
return get_provider(package_or_requirement).resource_isdir(
resource_name
)
def resource_filename(self, package_or_requirement, resource_name):
"""Return a true filesystem path for specified resource"""
return get_provider(package_or_requirement).get_resource_filename(
self, resource_name
)
def resource_stream(self, package_or_requirement, resource_name):
"""Return a readable file-like object for specified resource"""
return get_provider(package_or_requirement).get_resource_stream(
self, resource_name
)
def resource_string(self, package_or_requirement, resource_name):
"""Return specified resource as a string"""
return get_provider(package_or_requirement).get_resource_string(
self, resource_name
)
def resource_listdir(self, package_or_requirement, resource_name):
"""List the contents of the named resource directory"""
return get_provider(package_or_requirement).resource_listdir(
resource_name
)
def extraction_error(self):
"""Give an error message for problems extracting file(s)"""
old_exc = sys.exc_info()[1]
cache_path = self.extraction_path or get_default_cache()
tmpl = textwrap.dedent("""
Can't extract file(s) to egg cache
The following error occurred while trying to extract file(s)
to the Python egg cache:
{old_exc}
The Python egg cache directory is currently set to:
{cache_path}
Perhaps your account does not have write access to this directory?
You can change the cache directory by setting the PYTHON_EGG_CACHE
environment variable to point to an accessible directory.
""").lstrip()
err = ExtractionError(tmpl.format(**locals()))
err.manager = self
err.cache_path = cache_path
err.original_error = old_exc
raise err
def get_cache_path(self, archive_name, names=()):
"""Return absolute location in cache for `archive_name` and `names`
The parent directory of the resulting path will be created if it does
not already exist. `archive_name` should be the base filename of the
enclosing egg (which may not be the name of the enclosing zipfile!),
including its ".egg" extension. `names`, if provided, should be a
sequence of path name parts "under" the egg's extraction location.
This method should only be called by resource providers that need to
obtain an extraction location, and only for names they intend to
extract, as it tracks the generated names for possible cleanup later.
"""
extract_path = self.extraction_path or get_default_cache()
target_path = os.path.join(extract_path, archive_name + '-tmp', *names)
try:
_bypass_ensure_directory(target_path)
except Exception:
self.extraction_error()
self._warn_unsafe_extraction_path(extract_path)
self.cached_files[target_path] = 1
return target_path
@staticmethod
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = (
"%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path
)
warnings.warn(msg, UserWarning)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def set_extraction_path(self, path):
"""Set the base path where resources will be extracted to, if needed.
If you do not call this routine before any extractions take place, the
path defaults to the return value of ``get_default_cache()``. (Which
is based on the ``PYTHON_EGG_CACHE`` environment variable, with various
platform-specific fallbacks. See that routine's documentation for more
details.)
Resources are extracted to subdirectories of this path based upon
information given by the ``IResourceProvider``. You may set this to a
temporary directory, but then you must call ``cleanup_resources()`` to
delete the extracted files when done. There is no guarantee that
``cleanup_resources()`` will be able to remove all extracted files.
(Note: you may not change the extraction path for a given resource
manager once resources have been extracted, unless you first call
``cleanup_resources()``.)
"""
if self.cached_files:
raise ValueError(
"Can't change extraction path, files already extracted"
)
self.extraction_path = path
def cleanup_resources(self, force=False):
"""
Delete all extracted resource files and directories, returning a list
of the file and directory names that could not be successfully removed.
This function does not have any concurrency protection, so it should
generally only be called when the extraction path is a temporary
directory exclusive to a single process. This method is not
automatically called; you must call it explicitly or register it as an
``atexit`` function if you wish to ensure cleanup of a temporary
directory used for extractions.
"""
# XXX
def get_default_cache():
"""
Return the ``PYTHON_EGG_CACHE`` environment variable
or a platform-relevant user cache dir for an app
named "Python-Eggs".
"""
return (
os.environ.get('PYTHON_EGG_CACHE')
or appdirs.user_cache_dir(appname='Python-Eggs')
)
def safe_name(name):
"""Convert an arbitrary string to a standard distribution name
Any runs of non-alphanumeric/. characters are replaced with a single '-'.
"""
return re.sub('[^A-Za-z0-9.]+', '-', name)
def safe_version(version):
"""
Convert an arbitrary string to a standard version string
"""
try:
# normalize the version
return str(packaging.version.Version(version))
except packaging.version.InvalidVersion:
version = version.replace(' ', '.')
return re.sub('[^A-Za-z0-9.]+', '-', version)
def safe_extra(extra):
"""Convert an arbitrary string to a standard 'extra' name
Any runs of non-alphanumeric characters are replaced with a single '_',
and the result is always lowercased.
"""
return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower()
def to_filename(name):
"""Convert a project or version name to its filename-escaped form
Any '-' characters are currently replaced with '_'.
"""
return name.replace('-', '_')
def invalid_marker(text):
"""
Validate text as a PEP 508 environment marker; return an exception
if invalid or False otherwise.
"""
try:
evaluate_marker(text)
except SyntaxError as e:
e.filename = None
e.lineno = None
return e
return False
def evaluate_marker(text, extra=None):
"""
Evaluate a PEP 508 environment marker.
Return a boolean indicating the marker result in this environment.
Raise SyntaxError if marker is invalid.
This implementation uses the 'pyparsing' module.
"""
try:
marker = packaging.markers.Marker(text)
return marker.evaluate()
except packaging.markers.InvalidMarker as e:
raise SyntaxError(e)
class NullProvider:
"""Try to implement resources and metadata for arbitrary PEP 302 loaders"""
egg_name = None
egg_info = None
loader = None
def __init__(self, module):
self.loader = getattr(module, '__loader__', None)
self.module_path = os.path.dirname(getattr(module, '__file__', ''))
def get_resource_filename(self, manager, resource_name):
return self._fn(self.module_path, resource_name)
def get_resource_stream(self, manager, resource_name):
return io.BytesIO(self.get_resource_string(manager, resource_name))
def get_resource_string(self, manager, resource_name):
return self._get(self._fn(self.module_path, resource_name))
def has_resource(self, resource_name):
return self._has(self._fn(self.module_path, resource_name))
def has_metadata(self, name):
return self.egg_info and self._has(self._fn(self.egg_info, name))
def get_metadata(self, name):
if not self.egg_info:
return ""
value = self._get(self._fn(self.egg_info, name))
return value.decode('utf-8') if six.PY3 else value
def get_metadata_lines(self, name):
return yield_lines(self.get_metadata(name))
def resource_isdir(self, resource_name):
return self._isdir(self._fn(self.module_path, resource_name))
def metadata_isdir(self, name):
return self.egg_info and self._isdir(self._fn(self.egg_info, name))
def resource_listdir(self, resource_name):
return self._listdir(self._fn(self.module_path, resource_name))
def metadata_listdir(self, name):
if self.egg_info:
return self._listdir(self._fn(self.egg_info, name))
return []
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError(
"Script {script!r} not found in metadata at {self.egg_info!r}"
.format(**locals()),
)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def _has(self, path):
raise NotImplementedError(
"Can't perform this operation for unregistered loader type"
)
def _isdir(self, path):
raise NotImplementedError(
"Can't perform this operation for unregistered loader type"
)
def _listdir(self, path):
raise NotImplementedError(
"Can't perform this operation for unregistered loader type"
)
def _fn(self, base, resource_name):
if resource_name:
return os.path.join(base, *resource_name.split('/'))
return base
def _get(self, path):
if hasattr(self.loader, 'get_data'):
return self.loader.get_data(path)
raise NotImplementedError(
"Can't perform this operation for loaders without 'get_data()'"
)
register_loader_type(object, NullProvider)
class EggProvider(NullProvider):
"""Provider based on a virtual filesystem"""
def __init__(self, module):
NullProvider.__init__(self, module)
self._setup_prefix()
def _setup_prefix(self):
# we assume here that our metadata may be nested inside a "basket"
# of multiple eggs; that's why we use module_path instead of .archive
path = self.module_path
old = None
while path != old:
if _is_egg_path(path):
self.egg_name = os.path.basename(path)
self.egg_info = os.path.join(path, 'EGG-INFO')
self.egg_root = path
break
old = path
path, base = os.path.split(path)
class DefaultProvider(EggProvider):
"""Provides access to package resources in the filesystem"""
def _has(self, path):
return os.path.exists(path)
def _isdir(self, path):
return os.path.isdir(path)
def _listdir(self, path):
return os.listdir(path)
def get_resource_stream(self, manager, resource_name):
return open(self._fn(self.module_path, resource_name), 'rb')
def _get(self, path):
with open(path, 'rb') as stream:
return stream.read()
@classmethod
def _register(cls):
loader_cls = getattr(
importlib_machinery,
'SourceFileLoader',
type(None),
)
register_loader_type(loader_cls, cls)
DefaultProvider._register()
class EmptyProvider(NullProvider):
"""Provider that returns nothing for all requests"""
module_path = None
_isdir = _has = lambda self, path: False
def _get(self, path):
return ''
def _listdir(self, path):
return []
def __init__(self):
pass
empty_provider = EmptyProvider()
class ZipManifests(dict):
"""
zip manifest builder
"""
@classmethod
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with zipfile.ZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
load = build
class MemoizedZipManifests(ZipManifests):
"""
Memoized zipfile manifests.
"""
manifest_mod = collections.namedtuple('manifest_mod', 'manifest mtime')
def load(self, path):
"""
Load a manifest at path or return a suitable manifest already loaded.
"""
path = os.path.normpath(path)
mtime = os.stat(path).st_mtime
if path not in self or self[path].mtime != mtime:
manifest = self.build(path)
self[path] = self.manifest_mod(manifest, mtime)
return self[path].manifest
class ZipProvider(EggProvider):
"""Resource support for zips and eggs"""
eagers = None
_zip_manifests = MemoizedZipManifests()
def __init__(self, module):
EggProvider.__init__(self, module)
self.zip_pre = self.loader.archive + os.sep
def _zipinfo_name(self, fspath):
# Convert a virtual filename (full path to file) into a zipfile subpath
# usable with the zipimport directory cache for our target archive
fspath = fspath.rstrip(os.sep)
if fspath == self.loader.archive:
return ''
if fspath.startswith(self.zip_pre):
return fspath[len(self.zip_pre):]
raise AssertionError(
"%s is not a subpath of %s" % (fspath, self.zip_pre)
)
def _parts(self, zip_path):
# Convert a zipfile subpath into an egg-relative path part list.
# pseudo-fs path
fspath = self.zip_pre + zip_path
if fspath.startswith(self.egg_root + os.sep):
return fspath[len(self.egg_root) + 1:].split(os.sep)
raise AssertionError(
"%s is not a subpath of %s" % (fspath, self.egg_root)
)
@property
def zipinfo(self):
return self._zip_manifests.load(self.loader.archive)
def get_resource_filename(self, manager, resource_name):
if not self.egg_name:
raise NotImplementedError(
"resource_filename() only supported for .egg, not .zip"
)
# no need to lock for extraction, since we use temp names
zip_path = self._resource_to_zip(resource_name)
eagers = self._get_eager_resources()
if '/'.join(self._parts(zip_path)) in eagers:
for name in eagers:
self._extract_resource(manager, self._eager_to_zip(name))
return self._extract_resource(manager, zip_path)
@staticmethod
def _get_date_and_size(zip_stat):
size = zip_stat.file_size
# ymdhms+wday, yday, dst
date_time = zip_stat.date_time + (0, 0, -1)
# 1980 offset already done
timestamp = time.mktime(date_time)
return timestamp, size
def _extract_resource(self, manager, zip_path):
if zip_path in self._index():
for name in self._index()[zip_path]:
last = self._extract_resource(
manager, os.path.join(zip_path, name)
)
# return the extracted directory name
return os.path.dirname(last)
timestamp, size = self._get_date_and_size(self.zipinfo[zip_path])
if not WRITE_SUPPORT:
raise IOError('"os.rename" and "os.unlink" are not supported '
'on this platform')
try:
real_path = manager.get_cache_path(
self.egg_name, self._parts(zip_path)
)
if self._is_current(real_path, zip_path):
return real_path
outf, tmpnam = _mkstemp(
".$extract",
dir=os.path.dirname(real_path),
)
os.write(outf, self.loader.get_data(zip_path))
os.close(outf)
utime(tmpnam, (timestamp, timestamp))
manager.postprocess(tmpnam, real_path)
try:
rename(tmpnam, real_path)
except os.error:
if os.path.isfile(real_path):
if self._is_current(real_path, zip_path):
# the file became current since it was checked above,
# so proceed.
return real_path
# Windows, del old file and retry
elif os.name == 'nt':
unlink(real_path)
rename(tmpnam, real_path)
return real_path
raise
except os.error:
# report a user-friendly error
manager.extraction_error()
return real_path
def _is_current(self, file_path, zip_path):
"""
Return True if the file_path is current for this zip_path
"""
timestamp, size = self._get_date_and_size(self.zipinfo[zip_path])
if not os.path.isfile(file_path):
return False
stat = os.stat(file_path)
if stat.st_size != size or stat.st_mtime != timestamp:
return False
# check that the contents match
zip_contents = self.loader.get_data(zip_path)
with open(file_path, 'rb') as f:
file_contents = f.read()
return zip_contents == file_contents
def _get_eager_resources(self):
if self.eagers is None:
eagers = []
for name in ('native_libs.txt', 'eager_resources.txt'):
if self.has_metadata(name):
eagers.extend(self.get_metadata_lines(name))
self.eagers = eagers
return self.eagers
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def _has(self, fspath):
zip_path = self._zipinfo_name(fspath)
return zip_path in self.zipinfo or zip_path in self._index()
def _isdir(self, fspath):
return self._zipinfo_name(fspath) in self._index()
def _listdir(self, fspath):
return list(self._index().get(self._zipinfo_name(fspath), ()))
def _eager_to_zip(self, resource_name):
return self._zipinfo_name(self._fn(self.egg_root, resource_name))
def _resource_to_zip(self, resource_name):
return self._zipinfo_name(self._fn(self.module_path, resource_name))
register_loader_type(zipimport.zipimporter, ZipProvider)
class FileMetadata(EmptyProvider):
"""Metadata handler for standalone PKG-INFO files
Usage::
metadata = FileMetadata("/path/to/PKG-INFO")
This provider rejects all data and metadata requests except for PKG-INFO,
which is treated as existing, and will be the contents of the file at
the provided location.
"""
def __init__(self, path):
self.path = path
def has_metadata(self, name):
return name == 'PKG-INFO' and os.path.isfile(self.path)
def get_metadata(self, name):
if name != 'PKG-INFO':
raise KeyError("No metadata except PKG-INFO is available")
with io.open(self.path, encoding='utf-8', errors="replace") as f:
metadata = f.read()
self._warn_on_replacement(metadata)
return metadata
def _warn_on_replacement(self, metadata):
# Python 2.7 compat for: replacement_char = '�'
replacement_char = b'\xef\xbf\xbd'.decode('utf-8')
if replacement_char in metadata:
tmpl = "{self.path} could not be properly decoded in UTF-8"
msg = tmpl.format(**locals())
warnings.warn(msg)
def get_metadata_lines(self, name):
return yield_lines(self.get_metadata(name))
class PathMetadata(DefaultProvider):
"""Metadata provider for egg directories
Usage::
# Development eggs:
egg_info = "/path/to/PackageName.egg-info"
base_dir = os.path.dirname(egg_info)
metadata = PathMetadata(base_dir, egg_info)
dist_name = os.path.splitext(os.path.basename(egg_info))[0]
dist = Distribution(basedir, project_name=dist_name, metadata=metadata)
# Unpacked egg directories:
egg_path = "/path/to/PackageName-ver-pyver-etc.egg"
metadata = PathMetadata(egg_path, os.path.join(egg_path,'EGG-INFO'))
dist = Distribution.from_filename(egg_path, metadata=metadata)
"""
def __init__(self, path, egg_info):
self.module_path = path
self.egg_info = egg_info
class EggMetadata(ZipProvider):
"""Metadata provider for .egg files"""
def __init__(self, importer):
"""Create a metadata provider from a zipimporter"""
self.zip_pre = importer.archive + os.sep
self.loader = importer
if importer.prefix:
self.module_path = os.path.join(importer.archive, importer.prefix)
else:
self.module_path = importer.archive
self._setup_prefix()
_declare_state('dict', _distribution_finders={})
def register_finder(importer_type, distribution_finder):
"""Register `distribution_finder` to find distributions in sys.path items
`importer_type` is the type or class of a PEP 302 "Importer" (sys.path item
handler), and `distribution_finder` is a callable that, passed a path
item and the importer instance, yields ``Distribution`` instances found on
that path item. See ``pkg_resources.find_on_path`` for an example."""
_distribution_finders[importer_type] = distribution_finder
def find_distributions(path_item, only=False):
"""Yield distributions accessible via `path_item`"""
importer = get_importer(path_item)
finder = _find_adapter(_distribution_finders, importer)
return finder(importer, path_item, only)
def find_eggs_in_zip(importer, path_item, only=False):
"""
Find eggs in zip files; possibly multiple nested eggs.
"""
if importer.archive.endswith('.whl'):
# wheels are not supported with this finder
# they don't have PKG-INFO metadata, and won't ever contain eggs
return
metadata = EggMetadata(importer)
if metadata.has_metadata('PKG-INFO'):
yield Distribution.from_filename(path_item, metadata=metadata)
if only:
# don't yield nested distros
return
for subitem in metadata.resource_listdir('/'):
if _is_egg_path(subitem):
subpath = os.path.join(path_item, subitem)
dists = find_eggs_in_zip(zipimport.zipimporter(subpath), subpath)
for dist in dists:
yield dist
elif subitem.lower().endswith('.dist-info'):
subpath = os.path.join(path_item, subitem)
submeta = EggMetadata(zipimport.zipimporter(subpath))
submeta.egg_info = subpath
yield Distribution.from_location(path_item, subitem, submeta)
register_finder(zipimport.zipimporter, find_eggs_in_zip)
def find_nothing(importer, path_item, only=False):
return ()
register_finder(object, find_nothing)
def _by_version_descending(names):
"""
Given a list of filenames, return them in descending order
by version number.
>>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg'
>>> _by_version_descending(names)
['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg']
"""
def _by_version(name):
"""
Parse each component of the filename
"""
name, ext = os.path.splitext(name)
parts = itertools.chain(name.split('-'), [ext])
return [packaging.version.parse(part) for part in parts]
return sorted(names, key=_by_version, reverse=True)
def find_on_path(importer, path_item, only=False):
"""Yield distributions accessible on a sys.path directory"""
path_item = _normalize_cached(path_item)
if _is_unpacked_egg(path_item):
yield Distribution.from_filename(
path_item, metadata=PathMetadata(
path_item, os.path.join(path_item, 'EGG-INFO')
)
)
return
entries = safe_listdir(path_item)
# for performance, before sorting by version,
# screen entries for only those that will yield
# distributions
filtered = (
entry
for entry in entries
if dist_factory(path_item, entry, only)
)
# scan for .egg and .egg-info in directory
path_item_entries = _by_version_descending(filtered)
for entry in path_item_entries:
fullpath = os.path.join(path_item, entry)
factory = dist_factory(path_item, entry, only)
for dist in factory(fullpath):
yield dist
def dist_factory(path_item, entry, only):
"""
Return a dist_factory for a path_item and entry
"""
lower = entry.lower()
is_meta = any(map(lower.endswith, ('.egg-info', '.dist-info')))
return (
distributions_from_metadata
if is_meta else
find_distributions
if not only and _is_egg_path(entry) else
resolve_egg_link
if not only and lower.endswith('.egg-link') else
NoDists()
)
class NoDists:
"""
>>> bool(NoDists())
False
>>> list(NoDists()('anything'))
[]
"""
def __bool__(self):
return False
if six.PY2:
__nonzero__ = __bool__
def __call__(self, fullpath):
return iter(())
def safe_listdir(path):
"""
Attempt to list contents of path, but suppress some exceptions.
"""
try:
return os.listdir(path)
except (PermissionError, NotADirectoryError):
pass
except OSError as e:
# Ignore the directory if does not exist, not a directory or
# permission denied
ignorable = (
e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT)
# Python 2 on Windows needs to be handled this way :(
or getattr(e, "winerror", None) == 267
)
if not ignorable:
raise
return ()
def distributions_from_metadata(path):
root = os.path.dirname(path)
if os.path.isdir(path):
if len(os.listdir(path)) == 0:
# empty metadata dir; skip
return
metadata = PathMetadata(root, path)
else:
metadata = FileMetadata(path)
entry = os.path.basename(path)
yield Distribution.from_location(
root, entry, metadata, precedence=DEVELOP_DIST,
)
def non_empty_lines(path):
"""
Yield non-empty lines from file at path
"""
with open(path) as f:
for line in f:
line = line.strip()
if line:
yield line
def resolve_egg_link(path):
"""
Given a path to an .egg-link, resolve distributions
present in the referenced path.
"""
referenced_paths = non_empty_lines(path)
resolved_paths = (
os.path.join(os.path.dirname(path), ref)
for ref in referenced_paths
)
dist_groups = map(find_distributions, resolved_paths)
return next(dist_groups, ())
register_finder(pkgutil.ImpImporter, find_on_path)
if hasattr(importlib_machinery, 'FileFinder'):
register_finder(importlib_machinery.FileFinder, find_on_path)
_declare_state('dict', _namespace_handlers={})
_declare_state('dict', _namespace_packages={})
def register_namespace_handler(importer_type, namespace_handler):
"""Register `namespace_handler` to declare namespace packages
`importer_type` is the type or class of a PEP 302 "Importer" (sys.path item
handler), and `namespace_handler` is a callable like this::
def namespace_handler(importer, path_entry, moduleName, module):
# return a path_entry to use for child packages
Namespace handlers are only called if the importer object has already
agreed that it can handle the relevant path item, and they should only
return a subpath if the module __path__ does not already contain an
equivalent subpath. For an example namespace handler, see
``pkg_resources.file_ns_handler``.
"""
_namespace_handlers[importer_type] = namespace_handler
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = types.ModuleType(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module, '__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer, path_item, packageName, module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
_rebuild_mod_path(path, packageName, module)
return subpath
def _rebuild_mod_path(orig_path, package_name, module):
"""
Rebuild module.__path__ ensuring that all entries are ordered
corresponding to their sys.path order
"""
sys_path = [_normalize_cached(p) for p in sys.path]
def safe_sys_path_index(entry):
"""
Workaround for #520 and #513.
"""
try:
return sys_path.index(entry)
except ValueError:
return float('inf')
def position_in_sys_path(path):
"""
Return the ordinal of the path based on its position in sys.path
"""
path_parts = path.split(os.sep)
module_parts = package_name.count('.') + 1
parts = path_parts[:-module_parts]
return safe_sys_path_index(_normalize_cached(os.sep.join(parts)))
if not isinstance(orig_path, list):
# Is this behavior useful when module.__path__ is not a list?
return
orig_path.sort(key=position_in_sys_path)
module.__path__[:] = [_normalize_cached(p) for p in orig_path]
def declare_namespace(packageName):
"""Declare that package 'packageName' is a namespace package"""
_imp.acquire_lock()
try:
if packageName in _namespace_packages:
return
path, parent = sys.path, None
if '.' in packageName:
parent = '.'.join(packageName.split('.')[:-1])
declare_namespace(parent)
if parent not in _namespace_packages:
__import__(parent)
try:
path = sys.modules[parent].__path__
except AttributeError:
raise TypeError("Not a package:", parent)
# Track what packages are namespaces, so when new path items are added,
# they can be updated
_namespace_packages.setdefault(parent, []).append(packageName)
_namespace_packages.setdefault(packageName, [])
for path_item in path:
# Ensure all the parent's path items are reflected in the child,
# if they apply
_handle_ns(packageName, path_item)
finally:
_imp.release_lock()
def fixup_namespace_packages(path_item, parent=None):
"""Ensure that previously-declared namespace packages include path_item"""
_imp.acquire_lock()
try:
for package in _namespace_packages.get(parent, ()):
subpath = _handle_ns(package, path_item)
if subpath:
fixup_namespace_packages(subpath, package)
finally:
_imp.release_lock()
def file_ns_handler(importer, path_item, packageName, module):
"""Compute an ns-package subpath for a filesystem or zipfile importer"""
subpath = os.path.join(path_item, packageName.split('.')[-1])
normalized = _normalize_cached(subpath)
for item in module.__path__:
if _normalize_cached(item) == normalized:
break
else:
# Only return the path if it's not already there
return subpath
register_namespace_handler(pkgutil.ImpImporter, file_ns_handler)
register_namespace_handler(zipimport.zipimporter, file_ns_handler)
if hasattr(importlib_machinery, 'FileFinder'):
register_namespace_handler(importlib_machinery.FileFinder, file_ns_handler)
def null_ns_handler(importer, path_item, packageName, module):
return None
register_namespace_handler(object, null_ns_handler)
def normalize_path(filename):
"""Normalize a file/dir name for comparison purposes"""
return os.path.normcase(os.path.realpath(filename))
def _normalize_cached(filename, _cache={}):
try:
return _cache[filename]
except KeyError:
_cache[filename] = result = normalize_path(filename)
return result
def _is_egg_path(path):
"""
Determine if given path appears to be an egg.
"""
return path.lower().endswith('.egg')
def _is_unpacked_egg(path):
"""
Determine if given path appears to be an unpacked egg.
"""
return (
_is_egg_path(path) and
os.path.isfile(os.path.join(path, 'EGG-INFO', 'PKG-INFO'))
)
def _set_parent_ns(packageName):
parts = packageName.split('.')
name = parts.pop()
if parts:
parent = '.'.join(parts)
setattr(sys.modules[parent], name, sys.modules[packageName])
def yield_lines(strs):
"""Yield non-empty/non-comment lines of a string or sequence"""
if isinstance(strs, six.string_types):
for s in strs.splitlines():
s = s.strip()
# skip blank lines/comments
if s and not s.startswith('#'):
yield s
else:
for ss in strs:
for s in yield_lines(ss):
yield s
MODULE = re.compile(r"\w+(\.\w+)*$").match
EGG_NAME = re.compile(
r"""
(?P<name>[^-]+) (
-(?P<ver>[^-]+) (
-py(?P<pyver>[^-]+) (
-(?P<plat>.+)
)?
)?
)?
""",
re.VERBOSE | re.IGNORECASE,
).match
class EntryPoint(object):
"""Object representing an advertised importable object"""
def __init__(self, name, module_name, attrs=(), extras=(), dist=None):
if not MODULE(module_name):
raise ValueError("Invalid module name", module_name)
self.name = name
self.module_name = module_name
self.attrs = tuple(attrs)
self.extras = tuple(extras)
self.dist = dist
def __str__(self):
s = "%s = %s" % (self.name, self.module_name)
if self.attrs:
s += ':' + '.'.join(self.attrs)
if self.extras:
s += ' [%s]' % ','.join(self.extras)
return s
def __repr__(self):
return "EntryPoint.parse(%r)" % str(self)
def load(self, require=True, *args, **kwargs):
"""
Require packages for this EntryPoint, then resolve it.
"""
if not require or args or kwargs:
warnings.warn(
"Parameters to load are deprecated. Call .resolve and "
".require separately.",
DeprecationWarning,
stacklevel=2,
)
if require:
self.require(*args, **kwargs)
return self.resolve()
def resolve(self):
"""
Resolve the entry point from its module and attrs.
"""
module = __import__(self.module_name, fromlist=['__name__'], level=0)
try:
return functools.reduce(getattr, self.attrs, module)
except AttributeError as exc:
raise ImportError(str(exc))
def require(self, env=None, installer=None):
if self.extras and not self.dist:
raise UnknownExtra("Can't require() without a distribution", self)
# Get the requirements for this entry point with all its extras and
# then resolve them. We have to pass `extras` along when resolving so
# that the working set knows what extras we want. Otherwise, for
# dist-info distributions, the working set will assume that the
# requirements for that extra are purely optional and skip over them.
reqs = self.dist.requires(self.extras)
items = working_set.resolve(reqs, env, installer, extras=self.extras)
list(map(working_set.add, items))
pattern = re.compile(
r'\s*'
r'(?P<name>.+?)\s*'
r'=\s*'
r'(?P<module>[\w.]+)\s*'
r'(:\s*(?P<attr>[\w.]+))?\s*'
r'(?P<extras>\[.*\])?\s*$'
)
@classmethod
def parse(cls, src, dist=None):
"""Parse a single entry point from string `src`
Entry point syntax follows the form::
name = some.module:some.attr [extra1, extra2]
The entry name and module name are required, but the ``:attrs`` and
``[extras]`` parts are optional
"""
m = cls.pattern.match(src)
if not m:
msg = "EntryPoint must be in 'name=module:attrs [extras]' format"
raise ValueError(msg, src)
res = m.groupdict()
extras = cls._parse_extras(res['extras'])
attrs = res['attr'].split('.') if res['attr'] else ()
return cls(res['name'], res['module'], attrs, extras, dist)
@classmethod
def _parse_extras(cls, extras_spec):
if not extras_spec:
return ()
req = Requirement.parse('x' + extras_spec)
if req.specs:
raise ValueError()
return req.extras
@classmethod
def parse_group(cls, group, lines, dist=None):
"""Parse an entry point group"""
if not MODULE(group):
raise ValueError("Invalid group name", group)
this = {}
for line in yield_lines(lines):
ep = cls.parse(line, dist)
if ep.name in this:
raise ValueError("Duplicate entry point", group, ep.name)
this[ep.name] = ep
return this
@classmethod
def parse_map(cls, data, dist=None):
"""Parse a map of entry point groups"""
if isinstance(data, dict):
data = data.items()
else:
data = split_sections(data)
maps = {}
for group, lines in data:
if group is None:
if not lines:
continue
raise ValueError("Entry points must be listed in groups")
group = group.strip()
if group in maps:
raise ValueError("Duplicate group name", group)
maps[group] = cls.parse_group(group, lines, dist)
return maps
def _remove_md5_fragment(location):
if not location:
return ''
parsed = urllib.parse.urlparse(location)
if parsed[-1].startswith('md5='):
return urllib.parse.urlunparse(parsed[:-1] + ('',))
return location
def _version_from_file(lines):
"""
Given an iterable of lines from a Metadata file, return
the value of the Version field, if present, or None otherwise.
"""
def is_version_line(line):
return line.lower().startswith('version:')
version_lines = filter(is_version_line, lines)
line = next(iter(version_lines), '')
_, _, value = line.partition(':')
return safe_version(value.strip()) or None
class Distribution(object):
"""Wrap an actual or potential sys.path entry w/metadata"""
PKG_INFO = 'PKG-INFO'
def __init__(
self, location=None, metadata=None, project_name=None,
version=None, py_version=PY_MAJOR, platform=None,
precedence=EGG_DIST):
self.project_name = safe_name(project_name or 'Unknown')
if version is not None:
self._version = safe_version(version)
self.py_version = py_version
self.platform = platform
self.location = location
self.precedence = precedence
self._provider = metadata or empty_provider
@classmethod
def from_location(cls, location, basename, metadata=None, **kw):
project_name, version, py_version, platform = [None] * 4
basename, ext = os.path.splitext(basename)
if ext.lower() in _distributionImpl:
cls = _distributionImpl[ext.lower()]
match = EGG_NAME(basename)
if match:
project_name, version, py_version, platform = match.group(
'name', 'ver', 'pyver', 'plat'
)
return cls(
location, metadata, project_name=project_name, version=version,
py_version=py_version, platform=platform, **kw
)._reload_version()
def _reload_version(self):
return self
@property
def hashcmp(self):
return (
self.parsed_version,
self.precedence,
self.key,
_remove_md5_fragment(self.location),
self.py_version or '',
self.platform or '',
)
def __hash__(self):
return hash(self.hashcmp)
def __lt__(self, other):
return self.hashcmp < other.hashcmp
def __le__(self, other):
return self.hashcmp <= other.hashcmp
def __gt__(self, other):
return self.hashcmp > other.hashcmp
def __ge__(self, other):
return self.hashcmp >= other.hashcmp
def __eq__(self, other):
if not isinstance(other, self.__class__):
# It's not a Distribution, so they are not equal
return False
return self.hashcmp == other.hashcmp
def __ne__(self, other):
return not self == other
# These properties have to be lazy so that we don't have to load any
# metadata until/unless it's actually needed. (i.e., some distributions
# may not know their name or version without loading PKG-INFO)
@property
def key(self):
try:
return self._key
except AttributeError:
self._key = key = self.project_name.lower()
return key
@property
def parsed_version(self):
if not hasattr(self, "_parsed_version"):
self._parsed_version = parse_version(self.version)
return self._parsed_version
def _warn_legacy_version(self):
LV = packaging.version.LegacyVersion
is_legacy = isinstance(self._parsed_version, LV)
if not is_legacy:
return
# While an empty version is technically a legacy version and
# is not a valid PEP 440 version, it's also unlikely to
# actually come from someone and instead it is more likely that
# it comes from setuptools attempting to parse a filename and
# including it in the list. So for that we'll gate this warning
# on if the version is anything at all or not.
if not self.version:
return
tmpl = textwrap.dedent("""
'{project_name} ({version})' is being parsed as a legacy,
non PEP 440,
version. You may find odd behavior and sort order.
In particular it will be sorted as less than 0.0. It
is recommended to migrate to PEP 440 compatible
versions.
""").strip().replace('\n', ' ')
warnings.warn(tmpl.format(**vars(self)), PEP440Warning)
@property
def version(self):
try:
return self._version
except AttributeError:
version = _version_from_file(self._get_metadata(self.PKG_INFO))
if version is None:
tmpl = "Missing 'Version:' header and/or %s file"
raise ValueError(tmpl % self.PKG_INFO, self)
return version
@property
def _dep_map(self):
"""
A map of extra to its list of (direct) requirements
for this distribution, including the null extra.
"""
try:
return self.__dep_map
except AttributeError:
self.__dep_map = self._filter_extras(self._build_dep_map())
return self.__dep_map
@staticmethod
def _filter_extras(dm):
"""
Given a mapping of extras to dependencies, strip off
environment markers and filter out any dependencies
not matching the markers.
"""
for extra in list(filter(None, dm)):
new_extra = extra
reqs = dm.pop(extra)
new_extra, _, marker = extra.partition(':')
fails_marker = marker and (
invalid_marker(marker)
or not evaluate_marker(marker)
)
if fails_marker:
reqs = []
new_extra = safe_extra(new_extra) or None
dm.setdefault(new_extra, []).extend(reqs)
return dm
def _build_dep_map(self):
dm = {}
for name in 'requires.txt', 'depends.txt':
for extra, reqs in split_sections(self._get_metadata(name)):
dm.setdefault(extra, []).extend(parse_requirements(reqs))
return dm
def requires(self, extras=()):
"""List of Requirements needed for this distro if `extras` are used"""
dm = self._dep_map
deps = []
deps.extend(dm.get(None, ()))
for ext in extras:
try:
deps.extend(dm[safe_extra(ext)])
except KeyError:
raise UnknownExtra(
"%s has no such extra feature %r" % (self, ext)
)
return deps
def _get_metadata(self, name):
if self.has_metadata(name):
for line in self.get_metadata_lines(name):
yield line
def activate(self, path=None, replace=False):
"""Ensure distribution is importable on `path` (default=sys.path)"""
if path is None:
path = sys.path
self.insert_on(path, replace=replace)
if path is sys.path:
fixup_namespace_packages(self.location)
for pkg in self._get_metadata('namespace_packages.txt'):
if pkg in sys.modules:
declare_namespace(pkg)
def egg_name(self):
"""Return what this distribution's standard .egg filename should be"""
filename = "%s-%s-py%s" % (
to_filename(self.project_name), to_filename(self.version),
self.py_version or PY_MAJOR
)
if self.platform:
filename += '-' + self.platform
return filename
def __repr__(self):
if self.location:
return "%s (%s)" % (self, self.location)
else:
return str(self)
def __str__(self):
try:
version = getattr(self, 'version', None)
except ValueError:
version = None
version = version or "[unknown version]"
return "%s %s" % (self.project_name, version)
def __getattr__(self, attr):
"""Delegate all unrecognized public attributes to .metadata provider"""
if attr.startswith('_'):
raise AttributeError(attr)
return getattr(self._provider, attr)
@classmethod
def from_filename(cls, filename, metadata=None, **kw):
return cls.from_location(
_normalize_cached(filename), os.path.basename(filename), metadata,
**kw
)
def as_requirement(self):
"""Return a ``Requirement`` that matches this distribution exactly"""
if isinstance(self.parsed_version, packaging.version.Version):
spec = "%s==%s" % (self.project_name, self.parsed_version)
else:
spec = "%s===%s" % (self.project_name, self.parsed_version)
return Requirement.parse(spec)
def load_entry_point(self, group, name):
"""Return the `name` entry point of `group` or raise ImportError"""
ep = self.get_entry_info(group, name)
if ep is None:
raise ImportError("Entry point %r not found" % ((group, name),))
return ep.load()
def get_entry_map(self, group=None):
"""Return the entry point map for `group`, or the full entry map"""
try:
ep_map = self._ep_map
except AttributeError:
ep_map = self._ep_map = EntryPoint.parse_map(
self._get_metadata('entry_points.txt'), self
)
if group is not None:
return ep_map.get(group, {})
return ep_map
def get_entry_info(self, group, name):
"""Return the EntryPoint object for `group`+`name`, or ``None``"""
return self.get_entry_map(group).get(name)
def insert_on(self, path, loc=None, replace=False):
"""Ensure self.location is on path
If replace=False (default):
- If location is already in path anywhere, do nothing.
- Else:
- If it's an egg and its parent directory is on path,
insert just ahead of the parent.
- Else: add to the end of path.
If replace=True:
- If location is already on path anywhere (not eggs)
or higher priority than its parent (eggs)
do nothing.
- Else:
- If it's an egg and its parent directory is on path,
insert just ahead of the parent,
removing any lower-priority entries.
- Else: add it to the front of path.
"""
loc = loc or self.location
if not loc:
return
nloc = _normalize_cached(loc)
bdir = os.path.dirname(nloc)
npath = [(p and _normalize_cached(p) or p) for p in path]
for p, item in enumerate(npath):
if item == nloc:
if replace:
break
else:
# don't modify path (even removing duplicates) if
# found and not replace
return
elif item == bdir and self.precedence == EGG_DIST:
# if it's an .egg, give it precedence over its directory
# UNLESS it's already been added to sys.path and replace=False
if (not replace) and nloc in npath[p:]:
return
if path is sys.path:
self.check_version_conflict()
path.insert(p, loc)
npath.insert(p, nloc)
break
else:
if path is sys.path:
self.check_version_conflict()
if replace:
path.insert(0, loc)
else:
path.append(loc)
return
# p is the spot where we found or inserted loc; now remove duplicates
while True:
try:
np = npath.index(nloc, p + 1)
except ValueError:
break
else:
del npath[np], path[np]
# ha!
p = np
return
def check_version_conflict(self):
if self.key == 'setuptools':
# ignore the inevitable setuptools self-conflicts :(
return
nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt'))
loc = normalize_path(self.location)
for modname in self._get_metadata('top_level.txt'):
if (modname not in sys.modules or modname in nsp
or modname in _namespace_packages):
continue
if modname in ('pkg_resources', 'setuptools', 'site'):
continue
fn = getattr(sys.modules[modname], '__file__', None)
if fn and (normalize_path(fn).startswith(loc) or
fn.startswith(self.location)):
continue
issue_warning(
"Module %s was already imported from %s, but %s is being added"
" to sys.path" % (modname, fn, self.location),
)
def has_version(self):
try:
self.version
except ValueError:
issue_warning("Unbuilt egg for " + repr(self))
return False
return True
def clone(self, **kw):
"""Copy this distribution, substituting in any changed keyword args"""
names = 'project_name version py_version platform location precedence'
for attr in names.split():
kw.setdefault(attr, getattr(self, attr, None))
kw.setdefault('metadata', self._provider)
return self.__class__(**kw)
@property
def extras(self):
return [dep for dep in self._dep_map if dep]
class EggInfoDistribution(Distribution):
def _reload_version(self):
"""
Packages installed by distutils (e.g. numpy or scipy),
which uses an old safe_version, and so
their version numbers can get mangled when
converted to filenames (e.g., 1.11.0.dev0+2329eae to
1.11.0.dev0_2329eae). These distributions will not be
parsed properly
downstream by Distribution and safe_version, so
take an extra step and try to get the version number from
the metadata file itself instead of the filename.
"""
md_version = _version_from_file(self._get_metadata(self.PKG_INFO))
if md_version:
self._version = md_version
return self
class DistInfoDistribution(Distribution):
"""
Wrap an actual or potential sys.path entry
w/metadata, .dist-info style.
"""
PKG_INFO = 'METADATA'
EQEQ = re.compile(r"([\(,])\s*(\d.*?)\s*([,\)])")
@property
def _parsed_pkg_info(self):
"""Parse and cache metadata"""
try:
return self._pkg_info
except AttributeError:
metadata = self.get_metadata(self.PKG_INFO)
self._pkg_info = email.parser.Parser().parsestr(metadata)
return self._pkg_info
@property
def _dep_map(self):
try:
return self.__dep_map
except AttributeError:
self.__dep_map = self._compute_dependencies()
return self.__dep_map
def _compute_dependencies(self):
"""Recompute this distribution's dependencies."""
dm = self.__dep_map = {None: []}
reqs = []
# Including any condition expressions
for req in self._parsed_pkg_info.get_all('Requires-Dist') or []:
reqs.extend(parse_requirements(req))
def reqs_for_extra(extra):
for req in reqs:
if not req.marker or req.marker.evaluate({'extra': extra}):
yield req
common = frozenset(reqs_for_extra(None))
dm[None].extend(common)
for extra in self._parsed_pkg_info.get_all('Provides-Extra') or []:
s_extra = safe_extra(extra.strip())
dm[s_extra] = list(frozenset(reqs_for_extra(extra)) - common)
return dm
_distributionImpl = {
'.egg': Distribution,
'.egg-info': EggInfoDistribution,
'.dist-info': DistInfoDistribution,
}
def issue_warning(*args, **kw):
level = 1
g = globals()
try:
# find the first stack frame that is *not* code in
# the pkg_resources module, to use for the warning
while sys._getframe(level).f_globals is g:
level += 1
except ValueError:
pass
warnings.warn(stacklevel=level + 1, *args, **kw)
class RequirementParseError(ValueError):
def __str__(self):
return ' '.join(self.args)
def parse_requirements(strs):
"""Yield ``Requirement`` objects for each specification in `strs`
`strs` must be a string, or a (possibly-nested) iterable thereof.
"""
# create a steppable iterator, so we can handle \-continuations
lines = iter(yield_lines(strs))
for line in lines:
# Drop comments -- a hash without a space may be in a URL.
if ' #' in line:
line = line[:line.find(' #')]
# If there is a line continuation, drop it, and append the next line.
if line.endswith('\\'):
line = line[:-2].strip()
try:
line += next(lines)
except StopIteration:
return
yield Requirement(line)
class Requirement(packaging.requirements.Requirement):
def __init__(self, requirement_string):
"""DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!"""
try:
super(Requirement, self).__init__(requirement_string)
except packaging.requirements.InvalidRequirement as e:
raise RequirementParseError(str(e))
self.unsafe_name = self.name
project_name = safe_name(self.name)
self.project_name, self.key = project_name, project_name.lower()
self.specs = [
(spec.operator, spec.version) for spec in self.specifier]
self.extras = tuple(map(safe_extra, self.extras))
self.hashCmp = (
self.key,
self.specifier,
frozenset(self.extras),
str(self.marker) if self.marker else None,
)
self.__hash = hash(self.hashCmp)
def __eq__(self, other):
return (
isinstance(other, Requirement) and
self.hashCmp == other.hashCmp
)
def __ne__(self, other):
return not self == other
def __contains__(self, item):
if isinstance(item, Distribution):
if item.key != self.key:
return False
item = item.version
# Allow prereleases always in order to match the previous behavior of
# this method. In the future this should be smarter and follow PEP 440
# more accurately.
return self.specifier.contains(item, prereleases=True)
def __hash__(self):
return self.__hash
def __repr__(self):
return "Requirement.parse(%r)" % str(self)
@staticmethod
def parse(s):
req, = parse_requirements(s)
return req
def _always_object(classes):
"""
Ensure object appears in the mro even
for old-style classes.
"""
if object not in classes:
return classes + (object,)
return classes
def _find_adapter(registry, ob):
"""Return an adapter factory for `ob` from `registry`"""
types = _always_object(inspect.getmro(getattr(ob, '__class__', type(ob))))
for t in types:
if t in registry:
return registry[t]
def ensure_directory(path):
"""Ensure that the parent directory of `path` exists"""
dirname = os.path.dirname(path)
py31compat.makedirs(dirname, exist_ok=True)
def _bypass_ensure_directory(path):
"""Sandbox-bypassing version of ensure_directory()"""
if not WRITE_SUPPORT:
raise IOError('"os.mkdir" not supported on this platform.')
dirname, filename = split(path)
if dirname and filename and not isdir(dirname):
_bypass_ensure_directory(dirname)
mkdir(dirname, 0o755)
def split_sections(s):
"""Split a string or iterable thereof into (section, content) pairs
Each ``section`` is a stripped version of the section header ("[section]")
and each ``content`` is a list of stripped lines excluding blank lines and
comment-only lines. If there are any such lines before the first section
header, they're returned in a first ``section`` of ``None``.
"""
section = None
content = []
for line in yield_lines(s):
if line.startswith("["):
if line.endswith("]"):
if section or content:
yield section, content
section = line[1:-1].strip()
content = []
else:
raise ValueError("Invalid section heading", line)
else:
content.append(line)
# wrap up last segment
yield section, content
def _mkstemp(*args, **kw):
old_open = os.open
try:
# temporarily bypass sandboxing
os.open = os_open
return tempfile.mkstemp(*args, **kw)
finally:
# and then put it back
os.open = old_open
# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
warnings.filterwarnings("ignore", category=PEP440Warning, append=True)
# from jaraco.functools 1.3
def _call_aside(f, *args, **kwargs):
f(*args, **kwargs)
return f
@_call_aside
def _initialize(g=globals()):
"Set up global resource manager (deliberately not state-saved)"
manager = ResourceManager()
g['_manager'] = manager
g.update(
(name, getattr(manager, name))
for name in dir(manager)
if not name.startswith('_')
)
@_call_aside
def _initialize_master_working_set():
"""
Prepare the master working set and make the ``require()``
API available.
This function has explicit effects on the global state
of pkg_resources. It is intended to be invoked once at
the initialization of this module.
Invocation by other packages is unsupported and done
at their own risk.
"""
working_set = WorkingSet._build_master()
_declare_state('object', working_set=working_set)
require = working_set.require
iter_entry_points = working_set.iter_entry_points
add_activation_listener = working_set.subscribe
run_script = working_set.run_script
# backward compatibility
run_main = run_script
# Activate all distributions already on sys.path with replace=False and
# ensure that all distributions added to the working set in the future
# (e.g. by calling ``require()``) will get activated as well,
# with higher priority (replace=True).
tuple(
dist.activate(replace=False)
for dist in working_set
)
add_activation_listener(
lambda dist: dist.activate(replace=True),
existing=False,
)
working_set.entries = []
# match order
list(map(working_set.add_entry, sys.path))
globals().update(locals())
lib64/python3.6/pydoc_data/__init__.py 0000644 00000000000 15173111042 0013435 0 ustar 00 lib64/python3.8/turtledemo/__init__.py 0000644 00000000472 15173111735 0013542 0 ustar 00 """
--------------------------------------
About this viewer
--------------------------------------
Tiny demo viewer to view turtle graphics example scripts.
Quickly and dirtyly assembled by Gregor Lingl.
June, 2006
For more information see: turtledemo - Help
Have fun!
"""
lib64/python3.8/ctypes/__init__.py 0000644 00000042763 15173123513 0012673 0 ustar 00 """create and manipulate C data types in Python"""
import os as _os, sys as _sys
__version__ = "1.1.0"
from _ctypes import Union, Structure, Array
from _ctypes import _Pointer
from _ctypes import CFuncPtr as _CFuncPtr
from _ctypes import __version__ as _ctypes_version
from _ctypes import RTLD_LOCAL, RTLD_GLOBAL
from _ctypes import ArgumentError
from struct import calcsize as _calcsize
if __version__ != _ctypes_version:
raise Exception("Version number mismatch", __version__, _ctypes_version)
if _os.name == "nt":
from _ctypes import FormatError
DEFAULT_MODE = RTLD_LOCAL
if _os.name == "posix" and _sys.platform == "darwin":
# On OS X 10.3, we use RTLD_GLOBAL as default mode
# because RTLD_LOCAL does not work at least on some
# libraries. OS X 10.3 is Darwin 7, so we check for
# that.
if int(_os.uname().release.split('.')[0]) < 8:
DEFAULT_MODE = RTLD_GLOBAL
from _ctypes import FUNCFLAG_CDECL as _FUNCFLAG_CDECL, \
FUNCFLAG_PYTHONAPI as _FUNCFLAG_PYTHONAPI, \
FUNCFLAG_USE_ERRNO as _FUNCFLAG_USE_ERRNO, \
FUNCFLAG_USE_LASTERROR as _FUNCFLAG_USE_LASTERROR
# WINOLEAPI -> HRESULT
# WINOLEAPI_(type)
#
# STDMETHODCALLTYPE
#
# STDMETHOD(name)
# STDMETHOD_(type, name)
#
# STDAPICALLTYPE
def create_string_buffer(init, size=None):
"""create_string_buffer(aBytes) -> character array
create_string_buffer(anInteger) -> character array
create_string_buffer(aBytes, anInteger) -> character array
"""
if isinstance(init, bytes):
if size is None:
size = len(init)+1
_sys.audit("ctypes.create_string_buffer", init, size)
buftype = c_char * size
buf = buftype()
buf.value = init
return buf
elif isinstance(init, int):
_sys.audit("ctypes.create_string_buffer", None, init)
buftype = c_char * init
buf = buftype()
return buf
raise TypeError(init)
def c_buffer(init, size=None):
## "deprecated, use create_string_buffer instead"
## import warnings
## warnings.warn("c_buffer is deprecated, use create_string_buffer instead",
## DeprecationWarning, stacklevel=2)
return create_string_buffer(init, size)
_c_functype_cache = {}
def CFUNCTYPE(restype, *argtypes, **kw):
"""CFUNCTYPE(restype, *argtypes,
use_errno=False, use_last_error=False) -> function prototype.
restype: the result type
argtypes: a sequence specifying the argument types
The function prototype can be called in different ways to create a
callable object:
prototype(integer address) -> foreign function
prototype(callable) -> create and return a C callable function from callable
prototype(integer index, method name[, paramflags]) -> foreign function calling a COM method
prototype((ordinal number, dll object)[, paramflags]) -> foreign function exported by ordinal
prototype((function name, dll object)[, paramflags]) -> foreign function exported by name
"""
flags = _FUNCFLAG_CDECL
if kw.pop("use_errno", False):
flags |= _FUNCFLAG_USE_ERRNO
if kw.pop("use_last_error", False):
flags |= _FUNCFLAG_USE_LASTERROR
if kw:
raise ValueError("unexpected keyword argument(s) %s" % kw.keys())
try:
return _c_functype_cache[(restype, argtypes, flags)]
except KeyError:
class CFunctionType(_CFuncPtr):
_argtypes_ = argtypes
_restype_ = restype
_flags_ = flags
_c_functype_cache[(restype, argtypes, flags)] = CFunctionType
return CFunctionType
if _os.name == "nt":
from _ctypes import LoadLibrary as _dlopen
from _ctypes import FUNCFLAG_STDCALL as _FUNCFLAG_STDCALL
_win_functype_cache = {}
def WINFUNCTYPE(restype, *argtypes, **kw):
# docstring set later (very similar to CFUNCTYPE.__doc__)
flags = _FUNCFLAG_STDCALL
if kw.pop("use_errno", False):
flags |= _FUNCFLAG_USE_ERRNO
if kw.pop("use_last_error", False):
flags |= _FUNCFLAG_USE_LASTERROR
if kw:
raise ValueError("unexpected keyword argument(s) %s" % kw.keys())
try:
return _win_functype_cache[(restype, argtypes, flags)]
except KeyError:
class WinFunctionType(_CFuncPtr):
_argtypes_ = argtypes
_restype_ = restype
_flags_ = flags
_win_functype_cache[(restype, argtypes, flags)] = WinFunctionType
return WinFunctionType
if WINFUNCTYPE.__doc__:
WINFUNCTYPE.__doc__ = CFUNCTYPE.__doc__.replace("CFUNCTYPE", "WINFUNCTYPE")
elif _os.name == "posix":
from _ctypes import dlopen as _dlopen
from _ctypes import sizeof, byref, addressof, alignment, resize
from _ctypes import get_errno, set_errno
from _ctypes import _SimpleCData
def _check_size(typ, typecode=None):
# Check if sizeof(ctypes_type) against struct.calcsize. This
# should protect somewhat against a misconfigured libffi.
from struct import calcsize
if typecode is None:
# Most _type_ codes are the same as used in struct
typecode = typ._type_
actual, required = sizeof(typ), calcsize(typecode)
if actual != required:
raise SystemError("sizeof(%s) wrong: %d instead of %d" % \
(typ, actual, required))
class py_object(_SimpleCData):
_type_ = "O"
def __repr__(self):
try:
return super().__repr__()
except ValueError:
return "%s(<NULL>)" % type(self).__name__
_check_size(py_object, "P")
class c_short(_SimpleCData):
_type_ = "h"
_check_size(c_short)
class c_ushort(_SimpleCData):
_type_ = "H"
_check_size(c_ushort)
class c_long(_SimpleCData):
_type_ = "l"
_check_size(c_long)
class c_ulong(_SimpleCData):
_type_ = "L"
_check_size(c_ulong)
if _calcsize("i") == _calcsize("l"):
# if int and long have the same size, make c_int an alias for c_long
c_int = c_long
c_uint = c_ulong
else:
class c_int(_SimpleCData):
_type_ = "i"
_check_size(c_int)
class c_uint(_SimpleCData):
_type_ = "I"
_check_size(c_uint)
class c_float(_SimpleCData):
_type_ = "f"
_check_size(c_float)
class c_double(_SimpleCData):
_type_ = "d"
_check_size(c_double)
class c_longdouble(_SimpleCData):
_type_ = "g"
if sizeof(c_longdouble) == sizeof(c_double):
c_longdouble = c_double
if _calcsize("l") == _calcsize("q"):
# if long and long long have the same size, make c_longlong an alias for c_long
c_longlong = c_long
c_ulonglong = c_ulong
else:
class c_longlong(_SimpleCData):
_type_ = "q"
_check_size(c_longlong)
class c_ulonglong(_SimpleCData):
_type_ = "Q"
## def from_param(cls, val):
## return ('d', float(val), val)
## from_param = classmethod(from_param)
_check_size(c_ulonglong)
class c_ubyte(_SimpleCData):
_type_ = "B"
c_ubyte.__ctype_le__ = c_ubyte.__ctype_be__ = c_ubyte
# backward compatibility:
##c_uchar = c_ubyte
_check_size(c_ubyte)
class c_byte(_SimpleCData):
_type_ = "b"
c_byte.__ctype_le__ = c_byte.__ctype_be__ = c_byte
_check_size(c_byte)
class c_char(_SimpleCData):
_type_ = "c"
c_char.__ctype_le__ = c_char.__ctype_be__ = c_char
_check_size(c_char)
class c_char_p(_SimpleCData):
_type_ = "z"
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, c_void_p.from_buffer(self).value)
_check_size(c_char_p, "P")
class c_void_p(_SimpleCData):
_type_ = "P"
c_voidp = c_void_p # backwards compatibility (to a bug)
_check_size(c_void_p)
class c_bool(_SimpleCData):
_type_ = "?"
from _ctypes import POINTER, pointer, _pointer_type_cache
class c_wchar_p(_SimpleCData):
_type_ = "Z"
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, c_void_p.from_buffer(self).value)
class c_wchar(_SimpleCData):
_type_ = "u"
def _reset_cache():
_pointer_type_cache.clear()
_c_functype_cache.clear()
if _os.name == "nt":
_win_functype_cache.clear()
# _SimpleCData.c_wchar_p_from_param
POINTER(c_wchar).from_param = c_wchar_p.from_param
# _SimpleCData.c_char_p_from_param
POINTER(c_char).from_param = c_char_p.from_param
_pointer_type_cache[None] = c_void_p
def create_unicode_buffer(init, size=None):
"""create_unicode_buffer(aString) -> character array
create_unicode_buffer(anInteger) -> character array
create_unicode_buffer(aString, anInteger) -> character array
"""
if isinstance(init, str):
if size is None:
if sizeof(c_wchar) == 2:
# UTF-16 requires a surrogate pair (2 wchar_t) for non-BMP
# characters (outside [U+0000; U+FFFF] range). +1 for trailing
# NUL character.
size = sum(2 if ord(c) > 0xFFFF else 1 for c in init) + 1
else:
# 32-bit wchar_t (1 wchar_t per Unicode character). +1 for
# trailing NUL character.
size = len(init) + 1
_sys.audit("ctypes.create_unicode_buffer", init, size)
buftype = c_wchar * size
buf = buftype()
buf.value = init
return buf
elif isinstance(init, int):
_sys.audit("ctypes.create_unicode_buffer", None, init)
buftype = c_wchar * init
buf = buftype()
return buf
raise TypeError(init)
# XXX Deprecated
def SetPointerType(pointer, cls):
if _pointer_type_cache.get(cls, None) is not None:
raise RuntimeError("This type already exists in the cache")
if id(pointer) not in _pointer_type_cache:
raise RuntimeError("What's this???")
pointer.set_type(cls)
_pointer_type_cache[cls] = pointer
del _pointer_type_cache[id(pointer)]
# XXX Deprecated
def ARRAY(typ, len):
return typ * len
################################################################
class CDLL(object):
"""An instance of this class represents a loaded dll/shared
library, exporting functions using the standard C calling
convention (named 'cdecl' on Windows).
The exported functions can be accessed as attributes, or by
indexing with the function name. Examples:
<obj>.qsort -> callable object
<obj>['qsort'] -> callable object
Calling the functions releases the Python GIL during the call and
reacquires it afterwards.
"""
_func_flags_ = _FUNCFLAG_CDECL
_func_restype_ = c_int
# default values for repr
_name = '<uninitialized>'
_handle = 0
_FuncPtr = None
def __init__(self, name, mode=DEFAULT_MODE, handle=None,
use_errno=False,
use_last_error=False,
winmode=None):
self._name = name
flags = self._func_flags_
if use_errno:
flags |= _FUNCFLAG_USE_ERRNO
if use_last_error:
flags |= _FUNCFLAG_USE_LASTERROR
if _sys.platform.startswith("aix"):
"""When the name contains ".a(" and ends with ")",
e.g., "libFOO.a(libFOO.so)" - this is taken to be an
archive(member) syntax for dlopen(), and the mode is adjusted.
Otherwise, name is presented to dlopen() as a file argument.
"""
if name and name.endswith(")") and ".a(" in name:
mode |= ( _os.RTLD_MEMBER | _os.RTLD_NOW )
if _os.name == "nt":
if winmode is not None:
mode = winmode
else:
import nt
mode = nt._LOAD_LIBRARY_SEARCH_DEFAULT_DIRS
if '/' in name or '\\' in name:
self._name = nt._getfullpathname(self._name)
mode |= nt._LOAD_LIBRARY_SEARCH_DLL_LOAD_DIR
class _FuncPtr(_CFuncPtr):
_flags_ = flags
_restype_ = self._func_restype_
self._FuncPtr = _FuncPtr
if handle is None:
self._handle = _dlopen(self._name, mode)
else:
self._handle = handle
def __repr__(self):
return "<%s '%s', handle %x at %#x>" % \
(self.__class__.__name__, self._name,
(self._handle & (_sys.maxsize*2 + 1)),
id(self) & (_sys.maxsize*2 + 1))
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
raise AttributeError(name)
func = self.__getitem__(name)
setattr(self, name, func)
return func
def __getitem__(self, name_or_ordinal):
func = self._FuncPtr((name_or_ordinal, self))
if not isinstance(name_or_ordinal, int):
func.__name__ = name_or_ordinal
return func
class PyDLL(CDLL):
"""This class represents the Python library itself. It allows
accessing Python API functions. The GIL is not released, and
Python exceptions are handled correctly.
"""
_func_flags_ = _FUNCFLAG_CDECL | _FUNCFLAG_PYTHONAPI
if _os.name == "nt":
class WinDLL(CDLL):
"""This class represents a dll exporting functions using the
Windows stdcall calling convention.
"""
_func_flags_ = _FUNCFLAG_STDCALL
# XXX Hm, what about HRESULT as normal parameter?
# Mustn't it derive from c_long then?
from _ctypes import _check_HRESULT, _SimpleCData
class HRESULT(_SimpleCData):
_type_ = "l"
# _check_retval_ is called with the function's result when it
# is used as restype. It checks for the FAILED bit, and
# raises an OSError if it is set.
#
# The _check_retval_ method is implemented in C, so that the
# method definition itself is not included in the traceback
# when it raises an error - that is what we want (and Python
# doesn't have a way to raise an exception in the caller's
# frame).
_check_retval_ = _check_HRESULT
class OleDLL(CDLL):
"""This class represents a dll exporting functions using the
Windows stdcall calling convention, and returning HRESULT.
HRESULT error values are automatically raised as OSError
exceptions.
"""
_func_flags_ = _FUNCFLAG_STDCALL
_func_restype_ = HRESULT
class LibraryLoader(object):
def __init__(self, dlltype):
self._dlltype = dlltype
def __getattr__(self, name):
if name[0] == '_':
raise AttributeError(name)
dll = self._dlltype(name)
setattr(self, name, dll)
return dll
def __getitem__(self, name):
return getattr(self, name)
def LoadLibrary(self, name):
return self._dlltype(name)
cdll = LibraryLoader(CDLL)
pydll = LibraryLoader(PyDLL)
if _os.name == "nt":
pythonapi = PyDLL("python dll", None, _sys.dllhandle)
elif _sys.platform == "cygwin":
pythonapi = PyDLL("libpython%d.%d.dll" % _sys.version_info[:2])
else:
pythonapi = PyDLL(None)
if _os.name == "nt":
windll = LibraryLoader(WinDLL)
oledll = LibraryLoader(OleDLL)
GetLastError = windll.kernel32.GetLastError
from _ctypes import get_last_error, set_last_error
def WinError(code=None, descr=None):
if code is None:
code = GetLastError()
if descr is None:
descr = FormatError(code).strip()
return OSError(None, descr, None, code)
if sizeof(c_uint) == sizeof(c_void_p):
c_size_t = c_uint
c_ssize_t = c_int
elif sizeof(c_ulong) == sizeof(c_void_p):
c_size_t = c_ulong
c_ssize_t = c_long
elif sizeof(c_ulonglong) == sizeof(c_void_p):
c_size_t = c_ulonglong
c_ssize_t = c_longlong
# functions
from _ctypes import _memmove_addr, _memset_addr, _string_at_addr, _cast_addr
## void *memmove(void *, const void *, size_t);
memmove = CFUNCTYPE(c_void_p, c_void_p, c_void_p, c_size_t)(_memmove_addr)
## void *memset(void *, int, size_t)
memset = CFUNCTYPE(c_void_p, c_void_p, c_int, c_size_t)(_memset_addr)
def PYFUNCTYPE(restype, *argtypes):
class CFunctionType(_CFuncPtr):
_argtypes_ = argtypes
_restype_ = restype
_flags_ = _FUNCFLAG_CDECL | _FUNCFLAG_PYTHONAPI
return CFunctionType
_cast = PYFUNCTYPE(py_object, c_void_p, py_object, py_object)(_cast_addr)
def cast(obj, typ):
return _cast(obj, obj, typ)
_string_at = PYFUNCTYPE(py_object, c_void_p, c_int)(_string_at_addr)
def string_at(ptr, size=-1):
"""string_at(addr[, size]) -> string
Return the string at addr."""
return _string_at(ptr, size)
try:
from _ctypes import _wstring_at_addr
except ImportError:
pass
else:
_wstring_at = PYFUNCTYPE(py_object, c_void_p, c_int)(_wstring_at_addr)
def wstring_at(ptr, size=-1):
"""wstring_at(addr[, size]) -> string
Return the string at addr."""
return _wstring_at(ptr, size)
if _os.name == "nt": # COM stuff
def DllGetClassObject(rclsid, riid, ppv):
try:
ccom = __import__("comtypes.server.inprocserver", globals(), locals(), ['*'])
except ImportError:
return -2147221231 # CLASS_E_CLASSNOTAVAILABLE
else:
return ccom.DllGetClassObject(rclsid, riid, ppv)
def DllCanUnloadNow():
try:
ccom = __import__("comtypes.server.inprocserver", globals(), locals(), ['*'])
except ImportError:
return 0 # S_OK
return ccom.DllCanUnloadNow()
from ctypes._endian import BigEndianStructure, LittleEndianStructure
# Fill in specifically-sized types
c_int8 = c_byte
c_uint8 = c_ubyte
for kind in [c_short, c_int, c_long, c_longlong]:
if sizeof(kind) == 2: c_int16 = kind
elif sizeof(kind) == 4: c_int32 = kind
elif sizeof(kind) == 8: c_int64 = kind
for kind in [c_ushort, c_uint, c_ulong, c_ulonglong]:
if sizeof(kind) == 2: c_uint16 = kind
elif sizeof(kind) == 4: c_uint32 = kind
elif sizeof(kind) == 8: c_uint64 = kind
del(kind)
_reset_cache()
lib64/python3.6/json/__init__.py 0000644 00000034074 15173271615 0012336 0 ustar 00 r"""JSON (JavaScript Object Notation) <http://json.org> is a subset of
JavaScript syntax (ECMA-262 3rd edition) used as a lightweight data
interchange format.
:mod:`json` exposes an API familiar to users of the standard library
:mod:`marshal` and :mod:`pickle` modules. It is derived from a
version of the externally maintained simplejson library.
Encoding basic Python object hierarchies::
>>> import json
>>> json.dumps(['foo', {'bar': ('baz', None, 1.0, 2)}])
'["foo", {"bar": ["baz", null, 1.0, 2]}]'
>>> print(json.dumps("\"foo\bar"))
"\"foo\bar"
>>> print(json.dumps('\u1234'))
"\u1234"
>>> print(json.dumps('\\'))
"\\"
>>> print(json.dumps({"c": 0, "b": 0, "a": 0}, sort_keys=True))
{"a": 0, "b": 0, "c": 0}
>>> from io import StringIO
>>> io = StringIO()
>>> json.dump(['streaming API'], io)
>>> io.getvalue()
'["streaming API"]'
Compact encoding::
>>> import json
>>> from collections import OrderedDict
>>> mydict = OrderedDict([('4', 5), ('6', 7)])
>>> json.dumps([1,2,3,mydict], separators=(',', ':'))
'[1,2,3,{"4":5,"6":7}]'
Pretty printing::
>>> import json
>>> print(json.dumps({'4': 5, '6': 7}, sort_keys=True, indent=4))
{
"4": 5,
"6": 7
}
Decoding JSON::
>>> import json
>>> obj = ['foo', {'bar': ['baz', None, 1.0, 2]}]
>>> json.loads('["foo", {"bar":["baz", null, 1.0, 2]}]') == obj
True
>>> json.loads('"\\"foo\\bar"') == '"foo\x08ar'
True
>>> from io import StringIO
>>> io = StringIO('["streaming API"]')
>>> json.load(io)[0] == 'streaming API'
True
Specializing JSON object decoding::
>>> import json
>>> def as_complex(dct):
... if '__complex__' in dct:
... return complex(dct['real'], dct['imag'])
... return dct
...
>>> json.loads('{"__complex__": true, "real": 1, "imag": 2}',
... object_hook=as_complex)
(1+2j)
>>> from decimal import Decimal
>>> json.loads('1.1', parse_float=Decimal) == Decimal('1.1')
True
Specializing JSON object encoding::
>>> import json
>>> def encode_complex(obj):
... if isinstance(obj, complex):
... return [obj.real, obj.imag]
... raise TypeError(repr(obj) + " is not JSON serializable")
...
>>> json.dumps(2 + 1j, default=encode_complex)
'[2.0, 1.0]'
>>> json.JSONEncoder(default=encode_complex).encode(2 + 1j)
'[2.0, 1.0]'
>>> ''.join(json.JSONEncoder(default=encode_complex).iterencode(2 + 1j))
'[2.0, 1.0]'
Using json.tool from the shell to validate and pretty-print::
$ echo '{"json":"obj"}' | python -m json.tool
{
"json": "obj"
}
$ echo '{ 1.2:3.4}' | python -m json.tool
Expecting property name enclosed in double quotes: line 1 column 3 (char 2)
"""
__version__ = '2.0.9'
__all__ = [
'dump', 'dumps', 'load', 'loads',
'JSONDecoder', 'JSONDecodeError', 'JSONEncoder',
]
__author__ = 'Bob Ippolito <bob@redivi.com>'
from .decoder import JSONDecoder, JSONDecodeError
from .encoder import JSONEncoder
import codecs
_default_encoder = JSONEncoder(
skipkeys=False,
ensure_ascii=True,
check_circular=True,
allow_nan=True,
indent=None,
separators=None,
default=None,
)
def dump(obj, fp, *, skipkeys=False, ensure_ascii=True, check_circular=True,
allow_nan=True, cls=None, indent=None, separators=None,
default=None, sort_keys=False, **kw):
"""Serialize ``obj`` as a JSON formatted stream to ``fp`` (a
``.write()``-supporting file-like object).
If ``skipkeys`` is true then ``dict`` keys that are not basic types
(``str``, ``int``, ``float``, ``bool``, ``None``) will be skipped
instead of raising a ``TypeError``.
If ``ensure_ascii`` is false, then the strings written to ``fp`` can
contain non-ASCII characters if they appear in strings contained in
``obj``. Otherwise, all such characters are escaped in JSON strings.
If ``check_circular`` is false, then the circular reference check
for container types will be skipped and a circular reference will
result in an ``OverflowError`` (or worse).
If ``allow_nan`` is false, then it will be a ``ValueError`` to
serialize out of range ``float`` values (``nan``, ``inf``, ``-inf``)
in strict compliance of the JSON specification, instead of using the
JavaScript equivalents (``NaN``, ``Infinity``, ``-Infinity``).
If ``indent`` is a non-negative integer, then JSON array elements and
object members will be pretty-printed with that indent level. An indent
level of 0 will only insert newlines. ``None`` is the most compact
representation.
If specified, ``separators`` should be an ``(item_separator, key_separator)``
tuple. The default is ``(', ', ': ')`` if *indent* is ``None`` and
``(',', ': ')`` otherwise. To get the most compact JSON representation,
you should specify ``(',', ':')`` to eliminate whitespace.
``default(obj)`` is a function that should return a serializable version
of obj or raise TypeError. The default simply raises TypeError.
If *sort_keys* is true (default: ``False``), then the output of
dictionaries will be sorted by key.
To use a custom ``JSONEncoder`` subclass (e.g. one that overrides the
``.default()`` method to serialize additional types), specify it with
the ``cls`` kwarg; otherwise ``JSONEncoder`` is used.
"""
# cached encoder
if (not skipkeys and ensure_ascii and
check_circular and allow_nan and
cls is None and indent is None and separators is None and
default is None and not sort_keys and not kw):
iterable = _default_encoder.iterencode(obj)
else:
if cls is None:
cls = JSONEncoder
iterable = cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii,
check_circular=check_circular, allow_nan=allow_nan, indent=indent,
separators=separators,
default=default, sort_keys=sort_keys, **kw).iterencode(obj)
# could accelerate with writelines in some versions of Python, at
# a debuggability cost
for chunk in iterable:
fp.write(chunk)
def dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True,
allow_nan=True, cls=None, indent=None, separators=None,
default=None, sort_keys=False, **kw):
"""Serialize ``obj`` to a JSON formatted ``str``.
If ``skipkeys`` is true then ``dict`` keys that are not basic types
(``str``, ``int``, ``float``, ``bool``, ``None``) will be skipped
instead of raising a ``TypeError``.
If ``ensure_ascii`` is false, then the return value can contain non-ASCII
characters if they appear in strings contained in ``obj``. Otherwise, all
such characters are escaped in JSON strings.
If ``check_circular`` is false, then the circular reference check
for container types will be skipped and a circular reference will
result in an ``OverflowError`` (or worse).
If ``allow_nan`` is false, then it will be a ``ValueError`` to
serialize out of range ``float`` values (``nan``, ``inf``, ``-inf``) in
strict compliance of the JSON specification, instead of using the
JavaScript equivalents (``NaN``, ``Infinity``, ``-Infinity``).
If ``indent`` is a non-negative integer, then JSON array elements and
object members will be pretty-printed with that indent level. An indent
level of 0 will only insert newlines. ``None`` is the most compact
representation.
If specified, ``separators`` should be an ``(item_separator, key_separator)``
tuple. The default is ``(', ', ': ')`` if *indent* is ``None`` and
``(',', ': ')`` otherwise. To get the most compact JSON representation,
you should specify ``(',', ':')`` to eliminate whitespace.
``default(obj)`` is a function that should return a serializable version
of obj or raise TypeError. The default simply raises TypeError.
If *sort_keys* is true (default: ``False``), then the output of
dictionaries will be sorted by key.
To use a custom ``JSONEncoder`` subclass (e.g. one that overrides the
``.default()`` method to serialize additional types), specify it with
the ``cls`` kwarg; otherwise ``JSONEncoder`` is used.
"""
# cached encoder
if (not skipkeys and ensure_ascii and
check_circular and allow_nan and
cls is None and indent is None and separators is None and
default is None and not sort_keys and not kw):
return _default_encoder.encode(obj)
if cls is None:
cls = JSONEncoder
return cls(
skipkeys=skipkeys, ensure_ascii=ensure_ascii,
check_circular=check_circular, allow_nan=allow_nan, indent=indent,
separators=separators, default=default, sort_keys=sort_keys,
**kw).encode(obj)
_default_decoder = JSONDecoder(object_hook=None, object_pairs_hook=None)
def detect_encoding(b):
bstartswith = b.startswith
if bstartswith((codecs.BOM_UTF32_BE, codecs.BOM_UTF32_LE)):
return 'utf-32'
if bstartswith((codecs.BOM_UTF16_BE, codecs.BOM_UTF16_LE)):
return 'utf-16'
if bstartswith(codecs.BOM_UTF8):
return 'utf-8-sig'
if len(b) >= 4:
if not b[0]:
# 00 00 -- -- - utf-32-be
# 00 XX -- -- - utf-16-be
return 'utf-16-be' if b[1] else 'utf-32-be'
if not b[1]:
# XX 00 00 00 - utf-32-le
# XX 00 00 XX - utf-16-le
# XX 00 XX -- - utf-16-le
return 'utf-16-le' if b[2] or b[3] else 'utf-32-le'
elif len(b) == 2:
if not b[0]:
# 00 XX - utf-16-be
return 'utf-16-be'
if not b[1]:
# XX 00 - utf-16-le
return 'utf-16-le'
# default
return 'utf-8'
def load(fp, *, cls=None, object_hook=None, parse_float=None,
parse_int=None, parse_constant=None, object_pairs_hook=None, **kw):
"""Deserialize ``fp`` (a ``.read()``-supporting file-like object containing
a JSON document) to a Python object.
``object_hook`` is an optional function that will be called with the
result of any object literal decode (a ``dict``). The return value of
``object_hook`` will be used instead of the ``dict``. This feature
can be used to implement custom decoders (e.g. JSON-RPC class hinting).
``object_pairs_hook`` is an optional function that will be called with the
result of any object literal decoded with an ordered list of pairs. The
return value of ``object_pairs_hook`` will be used instead of the ``dict``.
This feature can be used to implement custom decoders that rely on the
order that the key and value pairs are decoded (for example,
collections.OrderedDict will remember the order of insertion). If
``object_hook`` is also defined, the ``object_pairs_hook`` takes priority.
To use a custom ``JSONDecoder`` subclass, specify it with the ``cls``
kwarg; otherwise ``JSONDecoder`` is used.
"""
return loads(fp.read(),
cls=cls, object_hook=object_hook,
parse_float=parse_float, parse_int=parse_int,
parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)
def loads(s, *, encoding=None, cls=None, object_hook=None, parse_float=None,
parse_int=None, parse_constant=None, object_pairs_hook=None, **kw):
"""Deserialize ``s`` (a ``str``, ``bytes`` or ``bytearray`` instance
containing a JSON document) to a Python object.
``object_hook`` is an optional function that will be called with the
result of any object literal decode (a ``dict``). The return value of
``object_hook`` will be used instead of the ``dict``. This feature
can be used to implement custom decoders (e.g. JSON-RPC class hinting).
``object_pairs_hook`` is an optional function that will be called with the
result of any object literal decoded with an ordered list of pairs. The
return value of ``object_pairs_hook`` will be used instead of the ``dict``.
This feature can be used to implement custom decoders that rely on the
order that the key and value pairs are decoded (for example,
collections.OrderedDict will remember the order of insertion). If
``object_hook`` is also defined, the ``object_pairs_hook`` takes priority.
``parse_float``, if specified, will be called with the string
of every JSON float to be decoded. By default this is equivalent to
float(num_str). This can be used to use another datatype or parser
for JSON floats (e.g. decimal.Decimal).
``parse_int``, if specified, will be called with the string
of every JSON int to be decoded. By default this is equivalent to
int(num_str). This can be used to use another datatype or parser
for JSON integers (e.g. float).
``parse_constant``, if specified, will be called with one of the
following strings: -Infinity, Infinity, NaN.
This can be used to raise an exception if invalid JSON numbers
are encountered.
To use a custom ``JSONDecoder`` subclass, specify it with the ``cls``
kwarg; otherwise ``JSONDecoder`` is used.
The ``encoding`` argument is ignored and deprecated.
"""
if isinstance(s, str):
if s.startswith('\ufeff'):
raise JSONDecodeError("Unexpected UTF-8 BOM (decode using utf-8-sig)",
s, 0)
else:
if not isinstance(s, (bytes, bytearray)):
raise TypeError('the JSON object must be str, bytes or bytearray, '
'not {!r}'.format(s.__class__.__name__))
s = s.decode(detect_encoding(s), 'surrogatepass')
if (cls is None and object_hook is None and
parse_int is None and parse_float is None and
parse_constant is None and object_pairs_hook is None and not kw):
return _default_decoder.decode(s)
if cls is None:
cls = JSONDecoder
if object_hook is not None:
kw['object_hook'] = object_hook
if object_pairs_hook is not None:
kw['object_pairs_hook'] = object_pairs_hook
if parse_float is not None:
kw['parse_float'] = parse_float
if parse_int is not None:
kw['parse_int'] = parse_int
if parse_constant is not None:
kw['parse_constant'] = parse_constant
return cls(**kw).decode(s)
lib64/python3.8/distutils/__init__.py 0000644 00000000354 15173301775 0013406 0 ustar 00 """distutils
The main package for the Python Module Distribution Utilities. Normally
used from a setup script as
from distutils.core import setup
setup (...)
"""
import sys
__version__ = sys.version[:sys.version.index(' ')]
lib64/python2.7/hotshot/__init__.py 0000644 00000005156 15173302370 0013045 0 ustar 00 """High-perfomance logging profiler, mostly written in C."""
import _hotshot
from _hotshot import ProfilerError
from warnings import warnpy3k as _warnpy3k
_warnpy3k("The 'hotshot' module is not supported in 3.x, "
"use the 'profile' module instead.", stacklevel=2)
class Profile:
def __init__(self, logfn, lineevents=0, linetimings=1):
self.lineevents = lineevents and 1 or 0
self.linetimings = (linetimings and lineevents) and 1 or 0
self._prof = p = _hotshot.profiler(
logfn, self.lineevents, self.linetimings)
# Attempt to avoid confusing results caused by the presence of
# Python wrappers around these functions, but only if we can
# be sure the methods have not been overridden or extended.
if self.__class__ is Profile:
self.close = p.close
self.start = p.start
self.stop = p.stop
self.addinfo = p.addinfo
def close(self):
"""Close the logfile and terminate the profiler."""
self._prof.close()
def fileno(self):
"""Return the file descriptor of the profiler's log file."""
return self._prof.fileno()
def start(self):
"""Start the profiler."""
self._prof.start()
def stop(self):
"""Stop the profiler."""
self._prof.stop()
def addinfo(self, key, value):
"""Add an arbitrary labelled value to the profile log."""
self._prof.addinfo(key, value)
# These methods offer the same interface as the profile.Profile class,
# but delegate most of the work to the C implementation underneath.
def run(self, cmd):
"""Profile an exec-compatible string in the script
environment.
The globals from the __main__ module are used as both the
globals and locals for the script.
"""
import __main__
dict = __main__.__dict__
return self.runctx(cmd, dict, dict)
def runctx(self, cmd, globals, locals):
"""Evaluate an exec-compatible string in a specific
environment.
The string is compiled before profiling begins.
"""
code = compile(cmd, "<string>", "exec")
self._prof.runcode(code, globals, locals)
return self
def runcall(self, func, *args, **kw):
"""Profile a single call of a callable.
Additional positional and keyword arguments may be passed
along; the result of the call is returned, and exceptions are
allowed to propagate cleanly, while ensuring that profiling is
disabled on the way out.
"""
return self._prof.runcall(func, args, kw)
lib64/python3.6/asyncio/__init__.py 0000644 00000002634 15173303572 0013025 0 ustar 00 """The asyncio package, tracking PEP 3156."""
import sys
# The selectors module is in the stdlib in Python 3.4 but not in 3.3.
# Do this first, so the other submodules can use "from . import selectors".
# Prefer asyncio/selectors.py over the stdlib one, as ours may be newer.
try:
from . import selectors
except ImportError:
import selectors # Will also be exported.
if sys.platform == 'win32':
# Similar thing for _overlapped.
try:
from . import _overlapped
except ImportError:
import _overlapped # Will also be exported.
# This relies on each of the submodules having an __all__ variable.
from .base_events import *
from .coroutines import *
from .events import *
from .futures import *
from .locks import *
from .protocols import *
from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *
from .transports import *
__all__ = (base_events.__all__ +
coroutines.__all__ +
events.__all__ +
futures.__all__ +
locks.__all__ +
protocols.__all__ +
queues.__all__ +
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
transports.__all__)
if sys.platform == 'win32': # pragma: no cover
from .windows_events import *
__all__ += windows_events.__all__
else:
from .unix_events import * # pragma: no cover
__all__ += unix_events.__all__
lib64/python3.6/xml/__init__.py 0000644 00000001055 15173307371 0012155 0 ustar 00 """Core XML support for Python.
This package contains four sub-packages:
dom -- The W3C Document Object Model. This supports DOM Level 1 +
Namespaces.
parsers -- Python wrappers for XML parsers (currently only supports Expat).
sax -- The Simple API for XML, developed by XML-Dev, led by David
Megginson and ported to Python by Lars Marius Garshol. This
supports the SAX 2 API.
etree -- The ElementTree XML library. This is a subset of the full
ElementTree XML release.
"""
__all__ = ["dom", "parsers", "sax", "etree"]
lib64/python2.7/unittest/__init__.py 0000644 00000005336 15173320752 0013240 0 ustar 00 """
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
Smalltalk testing framework.
This module contains the core framework classes that form the basis of
specific test cases and suites (TestCase, TestSuite etc.), and also a
text-based utility class for running the tests and reporting the results
(TextTestRunner).
Simple usage:
import unittest
class IntegerArithmeticTestCase(unittest.TestCase):
def testAdd(self): ## test method names begin 'test*'
self.assertEqual((1 + 2), 3)
self.assertEqual(0 + 1, 1)
def testMultiply(self):
self.assertEqual((0 * 10), 0)
self.assertEqual((5 * 8), 40)
if __name__ == '__main__':
unittest.main()
Further information is available in the bundled documentation, and from
http://docs.python.org/library/unittest.html
Copyright (c) 1999-2003 Steve Purcell
Copyright (c) 2003-2010 Python Software Foundation
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
__all__ = ['TestResult', 'TestCase', 'TestSuite',
'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
'expectedFailure', 'TextTestResult', 'installHandler',
'registerResult', 'removeResult', 'removeHandler']
# Expose obsolete functions for backwards compatibility
__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])
__unittest = True
from .result import TestResult
from .case import (TestCase, FunctionTestCase, SkipTest, skip, skipIf,
skipUnless, expectedFailure,
_skipInRpmBuild, _expectedFailureInRpmBuild)
from .suite import BaseTestSuite, TestSuite
from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames,
findTestCases)
from .main import TestProgram, main
from .runner import TextTestRunner, TextTestResult
from .signals import installHandler, registerResult, removeResult, removeHandler
# deprecated
_TextTestResult = TextTestResult
lib64/python2.7/ctypes/__init__.py 0000644 00000041366 15173335610 0012672 0 ustar 00 """create and manipulate C data types in Python"""
import os as _os, sys as _sys
__version__ = "1.1.0"
from _ctypes import Union, Structure, Array
from _ctypes import _Pointer
from _ctypes import CFuncPtr as _CFuncPtr
from _ctypes import __version__ as _ctypes_version
from _ctypes import RTLD_LOCAL, RTLD_GLOBAL
from _ctypes import ArgumentError
from struct import calcsize as _calcsize
if __version__ != _ctypes_version:
raise Exception("Version number mismatch", __version__, _ctypes_version)
if _os.name in ("nt", "ce"):
from _ctypes import FormatError
DEFAULT_MODE = RTLD_LOCAL
if _os.name == "posix" and _sys.platform == "darwin":
# On OS X 10.3, we use RTLD_GLOBAL as default mode
# because RTLD_LOCAL does not work at least on some
# libraries. OS X 10.3 is Darwin 7, so we check for
# that.
if int(_os.uname()[2].split('.')[0]) < 8:
DEFAULT_MODE = RTLD_GLOBAL
from _ctypes import FUNCFLAG_CDECL as _FUNCFLAG_CDECL, \
FUNCFLAG_PYTHONAPI as _FUNCFLAG_PYTHONAPI, \
FUNCFLAG_USE_ERRNO as _FUNCFLAG_USE_ERRNO, \
FUNCFLAG_USE_LASTERROR as _FUNCFLAG_USE_LASTERROR
"""
WINOLEAPI -> HRESULT
WINOLEAPI_(type)
STDMETHODCALLTYPE
STDMETHOD(name)
STDMETHOD_(type, name)
STDAPICALLTYPE
"""
def create_string_buffer(init, size=None):
"""create_string_buffer(aString) -> character array
create_string_buffer(anInteger) -> character array
create_string_buffer(aString, anInteger) -> character array
"""
if isinstance(init, (str, unicode)):
if size is None:
size = len(init)+1
buftype = c_char * size
buf = buftype()
buf.value = init
return buf
elif isinstance(init, (int, long)):
buftype = c_char * init
buf = buftype()
return buf
raise TypeError(init)
def c_buffer(init, size=None):
## "deprecated, use create_string_buffer instead"
## import warnings
## warnings.warn("c_buffer is deprecated, use create_string_buffer instead",
## DeprecationWarning, stacklevel=2)
return create_string_buffer(init, size)
_c_functype_cache = {}
def CFUNCTYPE(restype, *argtypes, **kw):
"""CFUNCTYPE(restype, *argtypes,
use_errno=False, use_last_error=False) -> function prototype.
restype: the result type
argtypes: a sequence specifying the argument types
The function prototype can be called in different ways to create a
callable object:
prototype(integer address) -> foreign function
prototype(callable) -> create and return a C callable function from callable
prototype(integer index, method name[, paramflags]) -> foreign function calling a COM method
prototype((ordinal number, dll object)[, paramflags]) -> foreign function exported by ordinal
prototype((function name, dll object)[, paramflags]) -> foreign function exported by name
"""
flags = _FUNCFLAG_CDECL
if kw.pop("use_errno", False):
flags |= _FUNCFLAG_USE_ERRNO
if kw.pop("use_last_error", False):
flags |= _FUNCFLAG_USE_LASTERROR
if kw:
raise ValueError("unexpected keyword argument(s) %s" % kw.keys())
try:
return _c_functype_cache[(restype, argtypes, flags)]
except KeyError:
class CFunctionType(_CFuncPtr):
_argtypes_ = argtypes
_restype_ = restype
_flags_ = flags
_c_functype_cache[(restype, argtypes, flags)] = CFunctionType
return CFunctionType
if _os.name in ("nt", "ce"):
from _ctypes import LoadLibrary as _dlopen
from _ctypes import FUNCFLAG_STDCALL as _FUNCFLAG_STDCALL
if _os.name == "ce":
# 'ce' doesn't have the stdcall calling convention
_FUNCFLAG_STDCALL = _FUNCFLAG_CDECL
_win_functype_cache = {}
def WINFUNCTYPE(restype, *argtypes, **kw):
# docstring set later (very similar to CFUNCTYPE.__doc__)
flags = _FUNCFLAG_STDCALL
if kw.pop("use_errno", False):
flags |= _FUNCFLAG_USE_ERRNO
if kw.pop("use_last_error", False):
flags |= _FUNCFLAG_USE_LASTERROR
if kw:
raise ValueError("unexpected keyword argument(s) %s" % kw.keys())
try:
return _win_functype_cache[(restype, argtypes, flags)]
except KeyError:
class WinFunctionType(_CFuncPtr):
_argtypes_ = argtypes
_restype_ = restype
_flags_ = flags
_win_functype_cache[(restype, argtypes, flags)] = WinFunctionType
return WinFunctionType
if WINFUNCTYPE.__doc__:
WINFUNCTYPE.__doc__ = CFUNCTYPE.__doc__.replace("CFUNCTYPE", "WINFUNCTYPE")
elif _os.name == "posix":
from _ctypes import dlopen as _dlopen
from _ctypes import sizeof, byref, addressof, alignment, resize
from _ctypes import get_errno, set_errno
from _ctypes import _SimpleCData
def _check_size(typ, typecode=None):
# Check if sizeof(ctypes_type) against struct.calcsize. This
# should protect somewhat against a misconfigured libffi.
from struct import calcsize
if typecode is None:
# Most _type_ codes are the same as used in struct
typecode = typ._type_
actual, required = sizeof(typ), calcsize(typecode)
if actual != required:
raise SystemError("sizeof(%s) wrong: %d instead of %d" % \
(typ, actual, required))
class py_object(_SimpleCData):
_type_ = "O"
def __repr__(self):
try:
return super(py_object, self).__repr__()
except ValueError:
return "%s(<NULL>)" % type(self).__name__
_check_size(py_object, "P")
class c_short(_SimpleCData):
_type_ = "h"
_check_size(c_short)
class c_ushort(_SimpleCData):
_type_ = "H"
_check_size(c_ushort)
class c_long(_SimpleCData):
_type_ = "l"
_check_size(c_long)
class c_ulong(_SimpleCData):
_type_ = "L"
_check_size(c_ulong)
if _calcsize("i") == _calcsize("l"):
# if int and long have the same size, make c_int an alias for c_long
c_int = c_long
c_uint = c_ulong
else:
class c_int(_SimpleCData):
_type_ = "i"
_check_size(c_int)
class c_uint(_SimpleCData):
_type_ = "I"
_check_size(c_uint)
class c_float(_SimpleCData):
_type_ = "f"
_check_size(c_float)
class c_double(_SimpleCData):
_type_ = "d"
_check_size(c_double)
class c_longdouble(_SimpleCData):
_type_ = "g"
if sizeof(c_longdouble) == sizeof(c_double):
c_longdouble = c_double
if _calcsize("l") == _calcsize("q"):
# if long and long long have the same size, make c_longlong an alias for c_long
c_longlong = c_long
c_ulonglong = c_ulong
else:
class c_longlong(_SimpleCData):
_type_ = "q"
_check_size(c_longlong)
class c_ulonglong(_SimpleCData):
_type_ = "Q"
## def from_param(cls, val):
## return ('d', float(val), val)
## from_param = classmethod(from_param)
_check_size(c_ulonglong)
class c_ubyte(_SimpleCData):
_type_ = "B"
c_ubyte.__ctype_le__ = c_ubyte.__ctype_be__ = c_ubyte
# backward compatibility:
##c_uchar = c_ubyte
_check_size(c_ubyte)
class c_byte(_SimpleCData):
_type_ = "b"
c_byte.__ctype_le__ = c_byte.__ctype_be__ = c_byte
_check_size(c_byte)
class c_char(_SimpleCData):
_type_ = "c"
c_char.__ctype_le__ = c_char.__ctype_be__ = c_char
_check_size(c_char)
class c_char_p(_SimpleCData):
_type_ = "z"
if _os.name == "nt":
def __repr__(self):
if not windll.kernel32.IsBadStringPtrA(self, -1):
return "%s(%r)" % (self.__class__.__name__, self.value)
return "%s(%s)" % (self.__class__.__name__, cast(self, c_void_p).value)
else:
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, cast(self, c_void_p).value)
_check_size(c_char_p, "P")
class c_void_p(_SimpleCData):
_type_ = "P"
c_voidp = c_void_p # backwards compatibility (to a bug)
_check_size(c_void_p)
class c_bool(_SimpleCData):
_type_ = "?"
from _ctypes import POINTER, pointer, _pointer_type_cache
def _reset_cache():
_pointer_type_cache.clear()
_c_functype_cache.clear()
if _os.name in ("nt", "ce"):
_win_functype_cache.clear()
# _SimpleCData.c_wchar_p_from_param
POINTER(c_wchar).from_param = c_wchar_p.from_param
# _SimpleCData.c_char_p_from_param
POINTER(c_char).from_param = c_char_p.from_param
_pointer_type_cache[None] = c_void_p
try:
from _ctypes import set_conversion_mode
except ImportError:
pass
else:
if _os.name in ("nt", "ce"):
set_conversion_mode("mbcs", "ignore")
else:
set_conversion_mode("ascii", "strict")
class c_wchar_p(_SimpleCData):
_type_ = "Z"
class c_wchar(_SimpleCData):
_type_ = "u"
def create_unicode_buffer(init, size=None):
"""create_unicode_buffer(aString) -> character array
create_unicode_buffer(anInteger) -> character array
create_unicode_buffer(aString, anInteger) -> character array
"""
if isinstance(init, (str, unicode)):
if size is None:
size = len(init)+1
buftype = c_wchar * size
buf = buftype()
buf.value = init
return buf
elif isinstance(init, (int, long)):
buftype = c_wchar * init
buf = buftype()
return buf
raise TypeError(init)
# XXX Deprecated
def SetPointerType(pointer, cls):
if _pointer_type_cache.get(cls, None) is not None:
raise RuntimeError("This type already exists in the cache")
if id(pointer) not in _pointer_type_cache:
raise RuntimeError("What's this???")
pointer.set_type(cls)
_pointer_type_cache[cls] = pointer
del _pointer_type_cache[id(pointer)]
# XXX Deprecated
def ARRAY(typ, len):
return typ * len
################################################################
class CDLL(object):
"""An instance of this class represents a loaded dll/shared
library, exporting functions using the standard C calling
convention (named 'cdecl' on Windows).
The exported functions can be accessed as attributes, or by
indexing with the function name. Examples:
<obj>.qsort -> callable object
<obj>['qsort'] -> callable object
Calling the functions releases the Python GIL during the call and
reacquires it afterwards.
"""
_func_flags_ = _FUNCFLAG_CDECL
_func_restype_ = c_int
# default values for repr
_name = '<uninitialized>'
_handle = 0
_FuncPtr = None
def __init__(self, name, mode=DEFAULT_MODE, handle=None,
use_errno=False,
use_last_error=False):
self._name = name
flags = self._func_flags_
if use_errno:
flags |= _FUNCFLAG_USE_ERRNO
if use_last_error:
flags |= _FUNCFLAG_USE_LASTERROR
class _FuncPtr(_CFuncPtr):
_flags_ = flags
_restype_ = self._func_restype_
self._FuncPtr = _FuncPtr
if handle is None:
self._handle = _dlopen(self._name, mode)
else:
self._handle = handle
def __repr__(self):
return "<%s '%s', handle %x at %x>" % \
(self.__class__.__name__, self._name,
(self._handle & (_sys.maxint*2 + 1)),
id(self) & (_sys.maxint*2 + 1))
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
raise AttributeError(name)
func = self.__getitem__(name)
setattr(self, name, func)
return func
def __getitem__(self, name_or_ordinal):
func = self._FuncPtr((name_or_ordinal, self))
if not isinstance(name_or_ordinal, (int, long)):
func.__name__ = name_or_ordinal
return func
class PyDLL(CDLL):
"""This class represents the Python library itself. It allows
accessing Python API functions. The GIL is not released, and
Python exceptions are handled correctly.
"""
_func_flags_ = _FUNCFLAG_CDECL | _FUNCFLAG_PYTHONAPI
if _os.name in ("nt", "ce"):
class WinDLL(CDLL):
"""This class represents a dll exporting functions using the
Windows stdcall calling convention.
"""
_func_flags_ = _FUNCFLAG_STDCALL
# XXX Hm, what about HRESULT as normal parameter?
# Mustn't it derive from c_long then?
from _ctypes import _check_HRESULT, _SimpleCData
class HRESULT(_SimpleCData):
_type_ = "l"
# _check_retval_ is called with the function's result when it
# is used as restype. It checks for the FAILED bit, and
# raises a WindowsError if it is set.
#
# The _check_retval_ method is implemented in C, so that the
# method definition itself is not included in the traceback
# when it raises an error - that is what we want (and Python
# doesn't have a way to raise an exception in the caller's
# frame).
_check_retval_ = _check_HRESULT
class OleDLL(CDLL):
"""This class represents a dll exporting functions using the
Windows stdcall calling convention, and returning HRESULT.
HRESULT error values are automatically raised as WindowsError
exceptions.
"""
_func_flags_ = _FUNCFLAG_STDCALL
_func_restype_ = HRESULT
class LibraryLoader(object):
def __init__(self, dlltype):
self._dlltype = dlltype
def __getattr__(self, name):
if name[0] == '_':
raise AttributeError(name)
dll = self._dlltype(name)
setattr(self, name, dll)
return dll
def __getitem__(self, name):
return getattr(self, name)
def LoadLibrary(self, name):
return self._dlltype(name)
cdll = LibraryLoader(CDLL)
pydll = LibraryLoader(PyDLL)
if _os.name in ("nt", "ce"):
pythonapi = PyDLL("python dll", None, _sys.dllhandle)
elif _sys.platform == "cygwin":
pythonapi = PyDLL("libpython%d.%d.dll" % _sys.version_info[:2])
else:
pythonapi = PyDLL(None)
if _os.name in ("nt", "ce"):
windll = LibraryLoader(WinDLL)
oledll = LibraryLoader(OleDLL)
if _os.name == "nt":
GetLastError = windll.kernel32.GetLastError
else:
GetLastError = windll.coredll.GetLastError
from _ctypes import get_last_error, set_last_error
def WinError(code=None, descr=None):
if code is None:
code = GetLastError()
if descr is None:
descr = FormatError(code).strip()
return WindowsError(code, descr)
if sizeof(c_uint) == sizeof(c_void_p):
c_size_t = c_uint
c_ssize_t = c_int
elif sizeof(c_ulong) == sizeof(c_void_p):
c_size_t = c_ulong
c_ssize_t = c_long
elif sizeof(c_ulonglong) == sizeof(c_void_p):
c_size_t = c_ulonglong
c_ssize_t = c_longlong
# functions
from _ctypes import _memmove_addr, _memset_addr, _string_at_addr, _cast_addr
## void *memmove(void *, const void *, size_t);
memmove = CFUNCTYPE(c_void_p, c_void_p, c_void_p, c_size_t)(_memmove_addr)
## void *memset(void *, int, size_t)
memset = CFUNCTYPE(c_void_p, c_void_p, c_int, c_size_t)(_memset_addr)
def PYFUNCTYPE(restype, *argtypes):
class CFunctionType(_CFuncPtr):
_argtypes_ = argtypes
_restype_ = restype
_flags_ = _FUNCFLAG_CDECL | _FUNCFLAG_PYTHONAPI
return CFunctionType
_cast = PYFUNCTYPE(py_object, c_void_p, py_object, py_object)(_cast_addr)
def cast(obj, typ):
return _cast(obj, obj, typ)
_string_at = PYFUNCTYPE(py_object, c_void_p, c_int)(_string_at_addr)
def string_at(ptr, size=-1):
"""string_at(addr[, size]) -> string
Return the string at addr."""
return _string_at(ptr, size)
try:
from _ctypes import _wstring_at_addr
except ImportError:
pass
else:
_wstring_at = PYFUNCTYPE(py_object, c_void_p, c_int)(_wstring_at_addr)
def wstring_at(ptr, size=-1):
"""wstring_at(addr[, size]) -> string
Return the string at addr."""
return _wstring_at(ptr, size)
if _os.name in ("nt", "ce"): # COM stuff
def DllGetClassObject(rclsid, riid, ppv):
try:
ccom = __import__("comtypes.server.inprocserver", globals(), locals(), ['*'])
except ImportError:
return -2147221231 # CLASS_E_CLASSNOTAVAILABLE
else:
return ccom.DllGetClassObject(rclsid, riid, ppv)
def DllCanUnloadNow():
try:
ccom = __import__("comtypes.server.inprocserver", globals(), locals(), ['*'])
except ImportError:
return 0 # S_OK
return ccom.DllCanUnloadNow()
from ctypes._endian import BigEndianStructure, LittleEndianStructure
# Fill in specifically-sized types
c_int8 = c_byte
c_uint8 = c_ubyte
for kind in [c_short, c_int, c_long, c_longlong]:
if sizeof(kind) == 2: c_int16 = kind
elif sizeof(kind) == 4: c_int32 = kind
elif sizeof(kind) == 8: c_int64 = kind
for kind in [c_ushort, c_uint, c_ulong, c_ulonglong]:
if sizeof(kind) == 2: c_uint16 = kind
elif sizeof(kind) == 4: c_uint32 = kind
elif sizeof(kind) == 8: c_uint64 = kind
del(kind)
_reset_cache()
lib64/python3.6/dbm/__init__.py 0000644 00000013227 15173340154 0012117 0 ustar 00 """Generic interface to all dbm clones.
Use
import dbm
d = dbm.open(file, 'w', 0o666)
The returned object is a dbm.gnu, dbm.ndbm or dbm.dumb object, dependent on the
type of database being opened (determined by the whichdb function) in the case
of an existing dbm. If the dbm does not exist and the create or new flag ('c'
or 'n') was specified, the dbm type will be determined by the availability of
the modules (tested in the above order).
It has the following interface (key and data are strings):
d[key] = data # store data at key (may override data at
# existing key)
data = d[key] # retrieve data at key (raise KeyError if no
# such key)
del d[key] # delete data stored at key (raises KeyError
# if no such key)
flag = key in d # true if the key exists
list = d.keys() # return a list of all existing keys (slow!)
Future versions may change the order in which implementations are
tested for existence, and add interfaces to other dbm-like
implementations.
"""
__all__ = ['open', 'whichdb', 'error']
import io
import os
import struct
import sys
class error(Exception):
pass
_names = ['dbm.gnu', 'dbm.ndbm', 'dbm.dumb']
_defaultmod = None
_modules = {}
error = (error, OSError)
try:
from dbm import ndbm
except ImportError:
ndbm = None
def open(file, flag='r', mode=0o666):
"""Open or create database at path given by *file*.
Optional argument *flag* can be 'r' (default) for read-only access, 'w'
for read-write access of an existing database, 'c' for read-write access
to a new or existing database, and 'n' for read-write access to a new
database.
Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
only if it doesn't exist; and 'n' always creates a new database.
"""
global _defaultmod
if _defaultmod is None:
for name in _names:
try:
mod = __import__(name, fromlist=['open'])
except ImportError:
continue
if not _defaultmod:
_defaultmod = mod
_modules[name] = mod
if not _defaultmod:
raise ImportError("no dbm clone found; tried %s" % _names)
# guess the type of an existing database, if not creating a new one
result = whichdb(file) if 'n' not in flag else None
if result is None:
# db doesn't exist or 'n' flag was specified to create a new db
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new flag was used so use default type
mod = _defaultmod
else:
raise error[0]("need 'c' or 'n' flag to open new db")
elif result == "":
# db type cannot be determined
raise error[0]("db type could not be determined")
elif result not in _modules:
raise error[0]("db type is {0}, but the module is not "
"available".format(result))
else:
mod = _modules[result]
return mod.open(file, flag, mode)
def whichdb(filename):
"""Guess which db package to use to open a db file.
Return values:
- None if the database file can't be read;
- empty string if the file can be read but can't be recognized
- the name of the dbm submodule (e.g. "ndbm" or "gnu") if recognized.
Importing the given module may still fail, and opening the
database using that module may still fail.
"""
# Check for ndbm first -- this has a .pag and a .dir file
try:
f = io.open(filename + ".pag", "rb")
f.close()
f = io.open(filename + ".dir", "rb")
f.close()
return "dbm.ndbm"
except OSError:
# some dbm emulations based on Berkeley DB generate a .db file
# some do not, but they should be caught by the bsd checks
try:
f = io.open(filename + ".db", "rb")
f.close()
# guarantee we can actually open the file using dbm
# kind of overkill, but since we are dealing with emulations
# it seems like a prudent step
if ndbm is not None:
d = ndbm.open(filename)
d.close()
return "dbm.ndbm"
except OSError:
pass
# Check for dumbdbm next -- this has a .dir and a .dat file
try:
# First check for presence of files
os.stat(filename + ".dat")
size = os.stat(filename + ".dir").st_size
# dumbdbm files with no keys are empty
if size == 0:
return "dbm.dumb"
f = io.open(filename + ".dir", "rb")
try:
if f.read(1) in (b"'", b'"'):
return "dbm.dumb"
finally:
f.close()
except OSError:
pass
# See if the file exists, return None if not
try:
f = io.open(filename, "rb")
except OSError:
return None
with f:
# Read the start of the file -- the magic number
s16 = f.read(16)
s = s16[0:4]
# Return "" if not at least 4 bytes
if len(s) != 4:
return ""
# Convert to 4-byte int in native byte order -- return "" if impossible
try:
(magic,) = struct.unpack("=l", s)
except struct.error:
return ""
# Check for GNU dbm
if magic in (0x13579ace, 0x13579acd, 0x13579acf):
return "dbm.gnu"
# Later versions of Berkeley db hash file have a 12-byte pad in
# front of the file type
try:
(magic,) = struct.unpack("=l", s16[-4:])
except struct.error:
return ""
# Unknown
return ""
if __name__ == "__main__":
for filename in sys.argv[1:]:
print(whichdb(filename) or "UNKNOWN", filename)
lib64/python3.6/lib2to3/__init__.py 0000644 00000000007 15173366477 0012643 0 ustar 00 #empty
lib64/python3.8/urllib/__init__.py 0000644 00000000000 15173432663 0012640 0 ustar 00 lib64/python3.8/dbm/__init__.py 0000644 00000013317 15173502212 0012114 0 ustar 00 """Generic interface to all dbm clones.
Use
import dbm
d = dbm.open(file, 'w', 0o666)
The returned object is a dbm.gnu, dbm.ndbm or dbm.dumb object, dependent on the
type of database being opened (determined by the whichdb function) in the case
of an existing dbm. If the dbm does not exist and the create or new flag ('c'
or 'n') was specified, the dbm type will be determined by the availability of
the modules (tested in the above order).
It has the following interface (key and data are strings):
d[key] = data # store data at key (may override data at
# existing key)
data = d[key] # retrieve data at key (raise KeyError if no
# such key)
del d[key] # delete data stored at key (raises KeyError
# if no such key)
flag = key in d # true if the key exists
list = d.keys() # return a list of all existing keys (slow!)
Future versions may change the order in which implementations are
tested for existence, and add interfaces to other dbm-like
implementations.
"""
__all__ = ['open', 'whichdb', 'error']
import io
import os
import struct
import sys
class error(Exception):
pass
_names = ['dbm.gnu', 'dbm.ndbm', 'dbm.dumb']
_defaultmod = None
_modules = {}
error = (error, OSError)
try:
from dbm import ndbm
except ImportError:
ndbm = None
def open(file, flag='r', mode=0o666):
"""Open or create database at path given by *file*.
Optional argument *flag* can be 'r' (default) for read-only access, 'w'
for read-write access of an existing database, 'c' for read-write access
to a new or existing database, and 'n' for read-write access to a new
database.
Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
only if it doesn't exist; and 'n' always creates a new database.
"""
global _defaultmod
if _defaultmod is None:
for name in _names:
try:
mod = __import__(name, fromlist=['open'])
except ImportError:
continue
if not _defaultmod:
_defaultmod = mod
_modules[name] = mod
if not _defaultmod:
raise ImportError("no dbm clone found; tried %s" % _names)
# guess the type of an existing database, if not creating a new one
result = whichdb(file) if 'n' not in flag else None
if result is None:
# db doesn't exist or 'n' flag was specified to create a new db
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new flag was used so use default type
mod = _defaultmod
else:
raise error[0]("db file doesn't exist; "
"use 'c' or 'n' flag to create a new db")
elif result == "":
# db type cannot be determined
raise error[0]("db type could not be determined")
elif result not in _modules:
raise error[0]("db type is {0}, but the module is not "
"available".format(result))
else:
mod = _modules[result]
return mod.open(file, flag, mode)
def whichdb(filename):
"""Guess which db package to use to open a db file.
Return values:
- None if the database file can't be read;
- empty string if the file can be read but can't be recognized
- the name of the dbm submodule (e.g. "ndbm" or "gnu") if recognized.
Importing the given module may still fail, and opening the
database using that module may still fail.
"""
# Check for ndbm first -- this has a .pag and a .dir file
try:
f = io.open(filename + ".pag", "rb")
f.close()
f = io.open(filename + ".dir", "rb")
f.close()
return "dbm.ndbm"
except OSError:
# some dbm emulations based on Berkeley DB generate a .db file
# some do not, but they should be caught by the bsd checks
try:
f = io.open(filename + ".db", "rb")
f.close()
# guarantee we can actually open the file using dbm
# kind of overkill, but since we are dealing with emulations
# it seems like a prudent step
if ndbm is not None:
d = ndbm.open(filename)
d.close()
return "dbm.ndbm"
except OSError:
pass
# Check for dumbdbm next -- this has a .dir and a .dat file
try:
# First check for presence of files
os.stat(filename + ".dat")
size = os.stat(filename + ".dir").st_size
# dumbdbm files with no keys are empty
if size == 0:
return "dbm.dumb"
f = io.open(filename + ".dir", "rb")
try:
if f.read(1) in (b"'", b'"'):
return "dbm.dumb"
finally:
f.close()
except OSError:
pass
# See if the file exists, return None if not
try:
f = io.open(filename, "rb")
except OSError:
return None
with f:
# Read the start of the file -- the magic number
s16 = f.read(16)
s = s16[0:4]
# Return "" if not at least 4 bytes
if len(s) != 4:
return ""
# Convert to 4-byte int in native byte order -- return "" if impossible
try:
(magic,) = struct.unpack("=l", s)
except struct.error:
return ""
# Check for GNU dbm
if magic in (0x13579ace, 0x13579acd, 0x13579acf):
return "dbm.gnu"
# Later versions of Berkeley db hash file have a 12-byte pad in
# front of the file type
try:
(magic,) = struct.unpack("=l", s16[-4:])
except struct.error:
return ""
# Unknown
return ""
if __name__ == "__main__":
for filename in sys.argv[1:]:
print(whichdb(filename) or "UNKNOWN", filename)