diff options
Diffstat (limited to 'lib/python2.7/multiprocessing/connection.py')
-rw-r--r-- | lib/python2.7/multiprocessing/connection.py | 465 |
1 files changed, 0 insertions, 465 deletions
diff --git a/lib/python2.7/multiprocessing/connection.py b/lib/python2.7/multiprocessing/connection.py deleted file mode 100644 index 57bf811..0000000 --- a/lib/python2.7/multiprocessing/connection.py +++ /dev/null @@ -1,465 +0,0 @@ -# -# A higher level module for using sockets (or Windows named pipes) -# -# multiprocessing/connection.py -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. -# - -__all__ = [ 'Client', 'Listener', 'Pipe' ] - -import os -import sys -import socket -import errno -import time -import tempfile -import itertools - -import _multiprocessing -from multiprocessing import current_process, AuthenticationError -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close - - -# -# -# - -BUFSIZE = 8192 -# A very generous timeout when it comes to local connections... -CONNECTION_TIMEOUT = 20. - -_mmap_counter = itertools.count() - -default_family = 'AF_INET' -families = ['AF_INET'] - -if hasattr(socket, 'AF_UNIX'): - default_family = 'AF_UNIX' - families += ['AF_UNIX'] - -if sys.platform == 'win32': - default_family = 'AF_PIPE' - families += ['AF_PIPE'] - - -def _init_timeout(timeout=CONNECTION_TIMEOUT): - return time.time() + timeout - -def _check_timeout(t): - return time.time() > t - -# -# -# - -def arbitrary_address(family): - ''' - Return an arbitrary free address for the given family - ''' - if family == 'AF_INET': - return ('localhost', 0) - elif family == 'AF_UNIX': - return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) - elif family == 'AF_PIPE': - return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % - (os.getpid(), _mmap_counter.next())) - else: - raise ValueError('unrecognized family') - - -def address_type(address): - ''' - Return the types of the address - - This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' - ''' - if type(address) == tuple: - return 'AF_INET' - elif type(address) is str and address.startswith('\\\\'): - return 'AF_PIPE' - elif type(address) is str: - return 'AF_UNIX' - else: - raise ValueError('address type of %r unrecognized' % address) - -# -# Public functions -# - -class Listener(object): - ''' - Returns a listener object. - - This is a wrapper for a bound socket which is 'listening' for - connections, or for a Windows named pipe. - ''' - def __init__(self, address=None, family=None, backlog=1, authkey=None): - family = family or (address and address_type(address)) \ - or default_family - address = address or arbitrary_address(family) - - if family == 'AF_PIPE': - self._listener = PipeListener(address, backlog) - else: - self._listener = SocketListener(address, family, backlog) - - if authkey is not None and not isinstance(authkey, bytes): - raise TypeError, 'authkey should be a byte string' - - self._authkey = authkey - - def accept(self): - ''' - Accept a connection on the bound socket or named pipe of `self`. - - Returns a `Connection` object. - ''' - c = self._listener.accept() - if self._authkey: - deliver_challenge(c, self._authkey) - answer_challenge(c, self._authkey) - return c - - def close(self): - ''' - Close the bound socket or named pipe of `self`. - ''' - return self._listener.close() - - address = property(lambda self: self._listener._address) - last_accepted = property(lambda self: self._listener._last_accepted) - - -def Client(address, family=None, authkey=None): - ''' - Returns a connection to the address of a `Listener` - ''' - family = family or address_type(address) - if family == 'AF_PIPE': - c = PipeClient(address) - else: - c = SocketClient(address) - - if authkey is not None and not isinstance(authkey, bytes): - raise TypeError, 'authkey should be a byte string' - - if authkey is not None: - answer_challenge(c, authkey) - deliver_challenge(c, authkey) - - return c - - -if sys.platform != 'win32': - - def Pipe(duplex=True): - ''' - Returns pair of connection objects at either end of a pipe - ''' - if duplex: - s1, s2 = socket.socketpair() - s1.setblocking(True) - s2.setblocking(True) - c1 = _multiprocessing.Connection(os.dup(s1.fileno())) - c2 = _multiprocessing.Connection(os.dup(s2.fileno())) - s1.close() - s2.close() - else: - fd1, fd2 = os.pipe() - c1 = _multiprocessing.Connection(fd1, writable=False) - c2 = _multiprocessing.Connection(fd2, readable=False) - - return c1, c2 - -else: - from _multiprocessing import win32 - - def Pipe(duplex=True): - ''' - Returns pair of connection objects at either end of a pipe - ''' - address = arbitrary_address('AF_PIPE') - if duplex: - openmode = win32.PIPE_ACCESS_DUPLEX - access = win32.GENERIC_READ | win32.GENERIC_WRITE - obsize, ibsize = BUFSIZE, BUFSIZE - else: - openmode = win32.PIPE_ACCESS_INBOUND - access = win32.GENERIC_WRITE - obsize, ibsize = 0, BUFSIZE - - h1 = win32.CreateNamedPipe( - address, openmode, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL - ) - win32.SetNamedPipeHandleState( - h2, win32.PIPE_READMODE_MESSAGE, None, None - ) - - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError, e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise - - c1 = _multiprocessing.PipeConnection(h1, writable=duplex) - c2 = _multiprocessing.PipeConnection(h2, readable=duplex) - - return c1, c2 - -# -# Definitions for connections based on sockets -# - -class SocketListener(object): - ''' - Representation of a socket which is bound to an address and listening - ''' - def __init__(self, address, family, backlog=1): - self._socket = socket.socket(getattr(socket, family)) - try: - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self._socket.setblocking(True) - self._socket.bind(address) - self._socket.listen(backlog) - self._address = self._socket.getsockname() - except socket.error: - self._socket.close() - raise - self._family = family - self._last_accepted = None - - if family == 'AF_UNIX': - self._unlink = Finalize( - self, os.unlink, args=(address,), exitpriority=0 - ) - else: - self._unlink = None - - def accept(self): - s, self._last_accepted = self._socket.accept() - s.setblocking(True) - fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) - s.close() - return conn - - def close(self): - self._socket.close() - if self._unlink is not None: - self._unlink() - - -def SocketClient(address): - ''' - Return a connection object connected to the socket given by `address` - ''' - family = address_type(address) - s = socket.socket( getattr(socket, family) ) - s.setblocking(True) - t = _init_timeout() - - while 1: - try: - s.connect(address) - except socket.error, e: - if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): - debug('failed to connect to address %s', address) - raise - time.sleep(0.01) - else: - break - else: - raise - - fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) - s.close() - return conn - -# -# Definitions for connections based on named pipes -# - -if sys.platform == 'win32': - - class PipeListener(object): - ''' - Representation of a named pipe - ''' - def __init__(self, address, backlog=None): - self._address = address - handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue = [handle] - self._last_accepted = None - - sub_debug('listener created with address=%r', self._address) - - self.close = Finalize( - self, PipeListener._finalize_pipe_listener, - args=(self._handle_queue, self._address), exitpriority=0 - ) - - def accept(self): - newhandle = win32.CreateNamedPipe( - self._address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue.append(newhandle) - handle = self._handle_queue.pop(0) - try: - win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError, e: - # ERROR_NO_DATA can occur if a client has already connected, - # written data and then disconnected -- see Issue 14725. - if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, - win32.ERROR_NO_DATA): - raise - return _multiprocessing.PipeConnection(handle) - - @staticmethod - def _finalize_pipe_listener(queue, address): - sub_debug('closing listener with address=%r', address) - for handle in queue: - close(handle) - - def PipeClient(address): - ''' - Return a connection object connected to the pipe given by `address` - ''' - t = _init_timeout() - while 1: - try: - win32.WaitNamedPipe(address, 1000) - h = win32.CreateFile( - address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL - ) - except WindowsError, e: - if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, - win32.ERROR_PIPE_BUSY) or _check_timeout(t): - raise - else: - break - else: - raise - - win32.SetNamedPipeHandleState( - h, win32.PIPE_READMODE_MESSAGE, None, None - ) - return _multiprocessing.PipeConnection(h) - -# -# Authentication stuff -# - -MESSAGE_LENGTH = 20 - -CHALLENGE = b'#CHALLENGE#' -WELCOME = b'#WELCOME#' -FAILURE = b'#FAILURE#' - -def deliver_challenge(connection, authkey): - import hmac - assert isinstance(authkey, bytes) - message = os.urandom(MESSAGE_LENGTH) - connection.send_bytes(CHALLENGE + message) - digest = hmac.new(authkey, message).digest() - response = connection.recv_bytes(256) # reject large message - if response == digest: - connection.send_bytes(WELCOME) - else: - connection.send_bytes(FAILURE) - raise AuthenticationError('digest received was wrong') - -def answer_challenge(connection, authkey): - import hmac - assert isinstance(authkey, bytes) - message = connection.recv_bytes(256) # reject large message - assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message - message = message[len(CHALLENGE):] - digest = hmac.new(authkey, message).digest() - connection.send_bytes(digest) - response = connection.recv_bytes(256) # reject large message - if response != WELCOME: - raise AuthenticationError('digest sent was rejected') - -# -# Support for using xmlrpclib for serialization -# - -class ConnectionWrapper(object): - def __init__(self, conn, dumps, loads): - self._conn = conn - self._dumps = dumps - self._loads = loads - for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): - obj = getattr(conn, attr) - setattr(self, attr, obj) - def send(self, obj): - s = self._dumps(obj) - self._conn.send_bytes(s) - def recv(self): - s = self._conn.recv_bytes() - return self._loads(s) - -def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') - -def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) - return obj - -class XmlListener(Listener): - def accept(self): - global xmlrpclib - import xmlrpclib - obj = Listener.accept(self) - return ConnectionWrapper(obj, _xml_dumps, _xml_loads) - -def XmlClient(*args, **kwds): - global xmlrpclib - import xmlrpclib - return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) |