/home/lnzliplg/www/multiprocessing.zip
PK {��\�U�D D shared_memory.pynu �[��� """Provides shared memory for direct access across processes.
The API of this package is currently provisional. Refer to the
documentation for details.
"""
__all__ = [ 'SharedMemory', 'ShareableList' ]
from functools import partial
import mmap
import os
import errno
import struct
import secrets
if os.name == "nt":
import _winapi
_USE_POSIX = False
else:
import _posixshmem
_USE_POSIX = True
_O_CREX = os.O_CREAT | os.O_EXCL
# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
_SHM_SAFE_NAME_LENGTH = 14
# Shared memory block name prefix
if _USE_POSIX:
_SHM_NAME_PREFIX = '/psm_'
else:
_SHM_NAME_PREFIX = 'wnsm_'
def _make_filename():
"Create a random filename for the shared memory object."
# number of random bytes to use for name
nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
assert len(name) <= _SHM_SAFE_NAME_LENGTH
return name
class SharedMemory:
"""Creates a new shared memory block or attaches to an existing
shared memory block.
Every shared memory block is assigned a unique name. This enables
one process to create a shared memory block with a particular name
so that a different process can attach to that same shared memory
block using that same name.
As a resource for sharing data across processes, shared memory blocks
may outlive the original process that created them. When one process
no longer needs access to a shared memory block that might still be
needed by other processes, the close() method should be called.
When a shared memory block is no longer needed by any process, the
unlink() method should be called to ensure proper cleanup."""
# Defaults; enables close() and unlink() to run without errors.
_name = None
_fd = -1
_mmap = None
_buf = None
_flags = os.O_RDWR
_mode = 0o600
_prepend_leading_slash = True if _USE_POSIX else False
def __init__(self, name=None, create=False, size=0):
if not size >= 0:
raise ValueError("'size' must be a positive integer")
if create:
self._flags = _O_CREX | os.O_RDWR
if size == 0:
raise ValueError("'size' must be a positive number different from zero")
if name is None and not self._flags & os.O_EXCL:
raise ValueError("'name' can only be None if create=True")
if _USE_POSIX:
# POSIX Shared Memory
if name is None:
while True:
name = _make_filename()
try:
self._fd = _posixshmem.shm_open(
name,
self._flags,
mode=self._mode
)
except FileExistsError:
continue
self._name = name
break
else:
name = "/" + name if self._prepend_leading_slash else name
self._fd = _posixshmem.shm_open(
name,
self._flags,
mode=self._mode
)
self._name = name
try:
if create and size:
os.ftruncate(self._fd, size)
stats = os.fstat(self._fd)
size = stats.st_size
self._mmap = mmap.mmap(self._fd, size)
except OSError:
self.unlink()
raise
from .resource_tracker import register
register(self._name, "shared_memory")
else:
# Windows Named Shared Memory
if create:
while True:
temp_name = _make_filename() if name is None else name
# Create and reserve shared memory block with this name
# until it can be attached to by mmap.
h_map = _winapi.CreateFileMapping(
_winapi.INVALID_HANDLE_VALUE,
_winapi.NULL,
_winapi.PAGE_READWRITE,
(size >> 32) & 0xFFFFFFFF,
size & 0xFFFFFFFF,
temp_name
)
try:
last_error_code = _winapi.GetLastError()
if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
if name is not None:
raise FileExistsError(
errno.EEXIST,
os.strerror(errno.EEXIST),
name,
_winapi.ERROR_ALREADY_EXISTS
)
else:
continue
self._mmap = mmap.mmap(-1, size, tagname=temp_name)
finally:
_winapi.CloseHandle(h_map)
self._name = temp_name
break
else:
self._name = name
# Dynamically determine the existing named shared memory
# block's size which is likely a multiple of mmap.PAGESIZE.
h_map = _winapi.OpenFileMapping(
_winapi.FILE_MAP_READ,
False,
name
)
try:
p_buf = _winapi.MapViewOfFile(
h_map,
_winapi.FILE_MAP_READ,
0,
0,
0
)
finally:
_winapi.CloseHandle(h_map)
size = _winapi.VirtualQuerySize(p_buf)
self._mmap = mmap.mmap(-1, size, tagname=name)
self._size = size
self._buf = memoryview(self._mmap)
def __del__(self):
try:
self.close()
except OSError:
pass
def __reduce__(self):
return (
self.__class__,
(
self.name,
False,
self.size,
),
)
def __repr__(self):
return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
@property
def buf(self):
"A memoryview of contents of the shared memory block."
return self._buf
@property
def name(self):
"Unique name that identifies the shared memory block."
reported_name = self._name
if _USE_POSIX and self._prepend_leading_slash:
if self._name.startswith("/"):
reported_name = self._name[1:]
return reported_name
@property
def size(self):
"Size in bytes."
return self._size
def close(self):
"""Closes access to the shared memory from this instance but does
not destroy the shared memory block."""
if self._buf is not None:
self._buf.release()
self._buf = None
if self._mmap is not None:
self._mmap.close()
self._mmap = None
if _USE_POSIX and self._fd >= 0:
os.close(self._fd)
self._fd = -1
def unlink(self):
"""Requests that the underlying shared memory block be destroyed.
In order to ensure proper cleanup of resources, unlink should be
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
from .resource_tracker import unregister
_posixshmem.shm_unlink(self._name)
unregister(self._name, "shared_memory")
_encoding = "utf8"
class ShareableList:
"""Pattern for a mutable list-like object shareable via a shared
memory block. It differs from the built-in list type in that these
lists can not change their overall length (i.e. no append, insert,
etc.)
Because values are packed into a memoryview as bytes, the struct
packing format for any storable value must require no more than 8
characters to describe its format."""
_types_mapping = {
int: "q",
float: "d",
bool: "xxxxxxx?",
str: "%ds",
bytes: "%ds",
None.__class__: "xxxxxx?x",
}
_alignment = 8
_back_transforms_mapping = {
0: lambda value: value, # int, float, bool
1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
2: lambda value: value.rstrip(b'\x00'), # bytes
3: lambda _value: None, # None
}
@staticmethod
def _extract_recreation_code(value):
"""Used in concert with _back_transforms_mapping to convert values
into the appropriate Python objects when retrieving them from
the list as well as when storing them."""
if not isinstance(value, (str, bytes, None.__class__)):
return 0
elif isinstance(value, str):
return 1
elif isinstance(value, bytes):
return 2
else:
return 3 # NoneType
def __init__(self, sequence=None, *, name=None):
if sequence is not None:
_formats = [
self._types_mapping[type(item)]
if not isinstance(item, (str, bytes))
else self._types_mapping[type(item)] % (
self._alignment * (len(item) // self._alignment + 1),
)
for item in sequence
]
self._list_len = len(_formats)
assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
self._allocated_bytes = tuple(
self._alignment if fmt[-1] != "s" else int(fmt[:-1])
for fmt in _formats
)
_recreation_codes = [
self._extract_recreation_code(item) for item in sequence
]
requested_size = struct.calcsize(
"q" + self._format_size_metainfo +
"".join(_formats) +
self._format_packing_metainfo +
self._format_back_transform_codes
)
else:
requested_size = 8 # Some platforms require > 0.
if name is not None and sequence is None:
self.shm = SharedMemory(name)
else:
self.shm = SharedMemory(name, create=True, size=requested_size)
if sequence is not None:
_enc = _encoding
struct.pack_into(
"q" + self._format_size_metainfo,
self.shm.buf,
0,
self._list_len,
*(self._allocated_bytes)
)
struct.pack_into(
"".join(_formats),
self.shm.buf,
self._offset_data_start,
*(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
)
struct.pack_into(
self._format_packing_metainfo,
self.shm.buf,
self._offset_packing_formats,
*(v.encode(_enc) for v in _formats)
)
struct.pack_into(
self._format_back_transform_codes,
self.shm.buf,
self._offset_back_transform_codes,
*(_recreation_codes)
)
else:
self._list_len = len(self) # Obtains size from offset 0 in buffer.
self._allocated_bytes = struct.unpack_from(
self._format_size_metainfo,
self.shm.buf,
1 * 8
)
def _get_packing_format(self, position):
"Gets the packing format for a single value stored in the list."
position = position if position >= 0 else position + self._list_len
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
v = struct.unpack_from(
"8s",
self.shm.buf,
self._offset_packing_formats + position * 8
)[0]
fmt = v.rstrip(b'\x00')
fmt_as_str = fmt.decode(_encoding)
return fmt_as_str
def _get_back_transform(self, position):
"Gets the back transformation function for a single value."
position = position if position >= 0 else position + self._list_len
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
transform_code = struct.unpack_from(
"b",
self.shm.buf,
self._offset_back_transform_codes + position
)[0]
transform_function = self._back_transforms_mapping[transform_code]
return transform_function
def _set_packing_format_and_transform(self, position, fmt_as_str, value):
"""Sets the packing format and back transformation code for a
single value in the list at the specified position."""
position = position if position >= 0 else position + self._list_len
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
struct.pack_into(
"8s",
self.shm.buf,
self._offset_packing_formats + position * 8,
fmt_as_str.encode(_encoding)
)
transform_code = self._extract_recreation_code(value)
struct.pack_into(
"b",
self.shm.buf,
self._offset_back_transform_codes + position,
transform_code
)
def __getitem__(self, position):
try:
offset = self._offset_data_start \
+ sum(self._allocated_bytes[:position])
(v,) = struct.unpack_from(
self._get_packing_format(position),
self.shm.buf,
offset
)
except IndexError:
raise IndexError("index out of range")
back_transform = self._get_back_transform(position)
v = back_transform(v)
return v
def __setitem__(self, position, value):
try:
offset = self._offset_data_start \
+ sum(self._allocated_bytes[:position])
current_format = self._get_packing_format(position)
except IndexError:
raise IndexError("assignment index out of range")
if not isinstance(value, (str, bytes)):
new_format = self._types_mapping[type(value)]
encoded_value = value
else:
encoded_value = (value.encode(_encoding)
if isinstance(value, str) else value)
if len(encoded_value) > self._allocated_bytes[position]:
raise ValueError("bytes/str item exceeds available storage")
if current_format[-1] == "s":
new_format = current_format
else:
new_format = self._types_mapping[str] % (
self._allocated_bytes[position],
)
self._set_packing_format_and_transform(
position,
new_format,
value
)
struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
def __reduce__(self):
return partial(self.__class__, name=self.shm.name), ()
def __len__(self):
return struct.unpack_from("q", self.shm.buf, 0)[0]
def __repr__(self):
return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
@property
def format(self):
"The struct packing format used by all currently stored values."
return "".join(
self._get_packing_format(i) for i in range(self._list_len)
)
@property
def _format_size_metainfo(self):
"The struct packing format used for metainfo on storage sizes."
return f"{self._list_len}q"
@property
def _format_packing_metainfo(self):
"The struct packing format used for the values' packing formats."
return "8s" * self._list_len
@property
def _format_back_transform_codes(self):
"The struct packing format used for the values' back transforms."
return "b" * self._list_len
@property
def _offset_data_start(self):
return (self._list_len + 1) * 8 # 8 bytes per "q"
@property
def _offset_packing_formats(self):
return self._offset_data_start + sum(self._allocated_bytes)
@property
def _offset_back_transform_codes(self):
return self._offset_packing_formats + self._list_len * 8
def count(self, value):
"L.count(value) -> integer -- return number of occurrences of value."
return sum(value == entry for entry in self)
def index(self, value):
"""L.index(value) -> integer -- return first index of value.
Raises ValueError if the value is not present."""
for position, entry in enumerate(self):
if value == entry:
return position
else:
raise ValueError(f"{value!r} not in this container")
PK {��\���}j- j- heap.pynu �[��� #
# Module which supports allocation of memory from an mmap
#
# multiprocessing/heap.py
#
# Copyright (c) 2006-2008, R Oudkerk
# Licensed to PSF under a Contributor Agreement.
#
import bisect
from collections import defaultdict
import mmap
import os
import sys
import tempfile
import threading
from .context import reduction, assert_spawning
from . import util
__all__ = ['BufferWrapper']
#
# Inheritable class which wraps an mmap, and from which blocks can be allocated
#
if sys.platform == 'win32':
import _winapi
class Arena(object):
"""
A shared memory area backed by anonymous memory (Windows).
"""
_rand = tempfile._RandomNameSequence()
def __init__(self, size):
self.size = size
for i in range(100):
name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
buf = mmap.mmap(-1, size, tagname=name)
if _winapi.GetLastError() == 0:
break
# We have reopened a preexisting mmap.
buf.close()
else:
raise FileExistsError('Cannot find name for new mmap')
self.name = name
self.buffer = buf
self._state = (self.size, self.name)
def __getstate__(self):
assert_spawning(self)
return self._state
def __setstate__(self, state):
self.size, self.name = self._state = state
# Reopen existing mmap
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
# XXX Temporarily preventing buildbot failures while determining
# XXX the correct long-term fix. See issue 23060
#assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
else:
class Arena(object):
"""
A shared memory area backed by a temporary file (POSIX).
"""
if sys.platform == 'linux':
_dir_candidates = ['/dev/shm']
else:
_dir_candidates = []
def __init__(self, size, fd=-1):
self.size = size
self.fd = fd
if fd == -1:
# Arena is created anew (if fd != -1, it means we're coming
# from rebuild_arena() below)
self.fd, name = tempfile.mkstemp(
prefix='pym-%d-'%os.getpid(),
dir=self._choose_dir(size))
os.unlink(name)
util.Finalize(self, os.close, (self.fd,))
os.ftruncate(self.fd, size)
self.buffer = mmap.mmap(self.fd, self.size)
def _choose_dir(self, size):
# Choose a non-storage backed directory if possible,
# to improve performance
for d in self._dir_candidates:
st = os.statvfs(d)
if st.f_bavail * st.f_frsize >= size: # enough free space?
return d
return util.get_temp_dir()
def reduce_arena(a):
if a.fd == -1:
raise ValueError('Arena is unpicklable because '
'forking was enabled when it was created')
return rebuild_arena, (a.size, reduction.DupFd(a.fd))
def rebuild_arena(size, dupfd):
return Arena(size, dupfd.detach())
reduction.register(Arena, reduce_arena)
#
# Class allowing allocation of chunks of memory from arenas
#
class Heap(object):
# Minimum malloc() alignment
_alignment = 8
_DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB
_DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
def __init__(self, size=mmap.PAGESIZE):
self._lastpid = os.getpid()
self._lock = threading.Lock()
# Current arena allocation size
self._size = size
# A sorted list of available block sizes in arenas
self._lengths = []
# Free block management:
# - map each block size to a list of `(Arena, start, stop)` blocks
self._len_to_seq = {}
# - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
# starting at that offset
self._start_to_block = {}
# - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
# ending at that offset
self._stop_to_block = {}
# Map arenas to their `(Arena, start, stop)` blocks in use
self._allocated_blocks = defaultdict(set)
self._arenas = []
# List of pending blocks to free - see comment in free() below
self._pending_free_blocks = []
# Statistics
self._n_mallocs = 0
self._n_frees = 0
@staticmethod
def _roundup(n, alignment):
# alignment must be a power of 2
mask = alignment - 1
return (n + mask) & ~mask
def _new_arena(self, size):
# Create a new arena with at least the given *size*
length = self._roundup(max(self._size, size), mmap.PAGESIZE)
# We carve larger and larger arenas, for efficiency, until we
# reach a large-ish size (roughly L3 cache-sized)
if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
self._size *= 2
util.info('allocating a new mmap of length %d', length)
arena = Arena(length)
self._arenas.append(arena)
return (arena, 0, length)
def _discard_arena(self, arena):
# Possibly delete the given (unused) arena
length = arena.size
# Reusing an existing arena is faster than creating a new one, so
# we only reclaim space if it's large enough.
if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
return
blocks = self._allocated_blocks.pop(arena)
assert not blocks
del self._start_to_block[(arena, 0)]
del self._stop_to_block[(arena, length)]
self._arenas.remove(arena)
seq = self._len_to_seq[length]
seq.remove((arena, 0, length))
if not seq:
del self._len_to_seq[length]
self._lengths.remove(length)
def _malloc(self, size):
# returns a large enough block -- it might be much larger
i = bisect.bisect_left(self._lengths, size)
if i == len(self._lengths):
return self._new_arena(size)
else:
length = self._lengths[i]
seq = self._len_to_seq[length]
block = seq.pop()
if not seq:
del self._len_to_seq[length], self._lengths[i]
(arena, start, stop) = block
del self._start_to_block[(arena, start)]
del self._stop_to_block[(arena, stop)]
return block
def _add_free_block(self, block):
# make block available and try to merge with its neighbours in the arena
(arena, start, stop) = block
try:
prev_block = self._stop_to_block[(arena, start)]
except KeyError:
pass
else:
start, _ = self._absorb(prev_block)
try:
next_block = self._start_to_block[(arena, stop)]
except KeyError:
pass
else:
_, stop = self._absorb(next_block)
block = (arena, start, stop)
length = stop - start
try:
self._len_to_seq[length].append(block)
except KeyError:
self._len_to_seq[length] = [block]
bisect.insort(self._lengths, length)
self._start_to_block[(arena, start)] = block
self._stop_to_block[(arena, stop)] = block
def _absorb(self, block):
# deregister this block so it can be merged with a neighbour
(arena, start, stop) = block
del self._start_to_block[(arena, start)]
del self._stop_to_block[(arena, stop)]
length = stop - start
seq = self._len_to_seq[length]
seq.remove(block)
if not seq:
del self._len_to_seq[length]
self._lengths.remove(length)
return start, stop
def _remove_allocated_block(self, block):
arena, start, stop = block
blocks = self._allocated_blocks[arena]
blocks.remove((start, stop))
if not blocks:
# Arena is entirely free, discard it from this process
self._discard_arena(arena)
def _free_pending_blocks(self):
# Free all the blocks in the pending list - called with the lock held.
while True:
try:
block = self._pending_free_blocks.pop()
except IndexError:
break
self._add_free_block(block)
self._remove_allocated_block(block)
def free(self, block):
# free a block returned by malloc()
# Since free() can be called asynchronously by the GC, it could happen
# that it's called while self._lock is held: in that case,
# self._lock.acquire() would deadlock (issue #12352). To avoid that, a
# trylock is used instead, and if the lock can't be acquired
# immediately, the block is added to a list of blocks to be freed
# synchronously sometimes later from malloc() or free(), by calling
# _free_pending_blocks() (appending and retrieving from a list is not
# strictly thread-safe but under CPython it's atomic thanks to the GIL).
if os.getpid() != self._lastpid:
raise ValueError(
"My pid ({0:n}) is not last pid {1:n}".format(
os.getpid(),self._lastpid))
if not self._lock.acquire(False):
# can't acquire the lock right now, add the block to the list of
# pending blocks to free
self._pending_free_blocks.append(block)
else:
# we hold the lock
try:
self._n_frees += 1
self._free_pending_blocks()
self._add_free_block(block)
self._remove_allocated_block(block)
finally:
self._lock.release()
def malloc(self, size):
# return a block of right size (possibly rounded up)
if size < 0:
raise ValueError("Size {0:n} out of range".format(size))
if sys.maxsize <= size:
raise OverflowError("Size {0:n} too large".format(size))
if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork
with self._lock:
self._n_mallocs += 1
# allow pending blocks to be marked available
self._free_pending_blocks()
size = self._roundup(max(size, 1), self._alignment)
(arena, start, stop) = self._malloc(size)
real_stop = start + size
if real_stop < stop:
# if the returned block is larger than necessary, mark
# the remainder available
self._add_free_block((arena, real_stop, stop))
self._allocated_blocks[arena].add((start, real_stop))
return (arena, start, real_stop)
#
# Class wrapping a block allocated out of a Heap -- can be inherited by child process
#
class BufferWrapper(object):
_heap = Heap()
def __init__(self, size):
if size < 0:
raise ValueError("Size {0:n} out of range".format(size))
if sys.maxsize <= size:
raise OverflowError("Size {0:n} too large".format(size))
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
util.Finalize(self, BufferWrapper._heap.free, args=(block,))
def create_memoryview(self):
(arena, start, stop), size = self._state
return memoryview(arena.buffer)[start:start+size]
PK {��\�0�"