502 lines
20 KiB
Python
502 lines
20 KiB
Python
"""Base class extended by connection adapters. This extends the
|
|
connection.Connection class to encapsulate connection behavior but still
|
|
isolate socket and low level communication.
|
|
|
|
"""
|
|
import abc
|
|
import functools
|
|
import logging
|
|
|
|
import pika.compat
|
|
import pika.exceptions
|
|
import pika.tcp_socket_opts
|
|
|
|
from pika.adapters.utils import connection_workflow, nbio_interface
|
|
from pika import connection
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class BaseConnection(connection.Connection):
|
|
"""BaseConnection class that should be extended by connection adapters.
|
|
|
|
This class abstracts I/O loop and transport services from pika core.
|
|
|
|
"""
|
|
|
|
def __init__(self, parameters, on_open_callback, on_open_error_callback,
|
|
on_close_callback, nbio, internal_connection_workflow):
|
|
"""Create a new instance of the Connection object.
|
|
|
|
:param None|pika.connection.Parameters parameters: Connection parameters
|
|
:param None|method on_open_callback: Method to call on connection open
|
|
:param None | method on_open_error_callback: Called if the connection
|
|
can't be established or connection establishment is interrupted by
|
|
`Connection.close()`: on_open_error_callback(Connection, exception).
|
|
:param None | method on_close_callback: Called when a previously fully
|
|
open connection is closed:
|
|
`on_close_callback(Connection, exception)`, where `exception` is
|
|
either an instance of `exceptions.ConnectionClosed` if closed by
|
|
user or broker or exception of another type that describes the cause
|
|
of connection failure.
|
|
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
|
|
asynchronous services
|
|
:param bool internal_connection_workflow: True for autonomous connection
|
|
establishment which is default; False for externally-managed
|
|
connection workflow via the `create_connection()` factory.
|
|
:raises: RuntimeError
|
|
:raises: ValueError
|
|
|
|
"""
|
|
if parameters and not isinstance(parameters, connection.Parameters):
|
|
raise ValueError(
|
|
'Expected instance of Parameters, not %r' % (parameters,))
|
|
|
|
self._nbio = nbio
|
|
|
|
self._connection_workflow = None # type: connection_workflow.AMQPConnectionWorkflow
|
|
self._transport = None # type: pika.adapters.utils.nbio_interface.AbstractStreamTransport
|
|
|
|
self._got_eof = False # transport indicated EOF (connection reset)
|
|
|
|
super(BaseConnection, self).__init__(
|
|
parameters,
|
|
on_open_callback,
|
|
on_open_error_callback,
|
|
on_close_callback,
|
|
internal_connection_workflow=internal_connection_workflow)
|
|
|
|
def _init_connection_state(self):
|
|
"""Initialize or reset all of our internal state variables for a given
|
|
connection. If we disconnect and reconnect, all of our state needs to
|
|
be wiped.
|
|
|
|
"""
|
|
super(BaseConnection, self)._init_connection_state()
|
|
|
|
self._connection_workflow = None
|
|
self._transport = None
|
|
self._got_eof = False
|
|
|
|
def __repr__(self):
|
|
|
|
# def get_socket_repr(sock):
|
|
# """Return socket info suitable for use in repr"""
|
|
# if sock is None:
|
|
# return None
|
|
#
|
|
# sockname = None
|
|
# peername = None
|
|
# try:
|
|
# sockname = sock.getsockname()
|
|
# except pika.compat.SOCKET_ERROR:
|
|
# # closed?
|
|
# pass
|
|
# else:
|
|
# try:
|
|
# peername = sock.getpeername()
|
|
# except pika.compat.SOCKET_ERROR:
|
|
# # not connected?
|
|
# pass
|
|
#
|
|
# return '%s->%s' % (sockname, peername)
|
|
# TODO need helpful __repr__ in transports
|
|
return ('<%s %s transport=%s params=%s>' % (
|
|
self.__class__.__name__, self._STATE_NAMES[self.connection_state],
|
|
self._transport, self.params))
|
|
|
|
@classmethod
|
|
@abc.abstractmethod
|
|
def create_connection(cls,
|
|
connection_configs,
|
|
on_done,
|
|
custom_ioloop=None,
|
|
workflow=None):
|
|
"""Asynchronously create a connection to an AMQP broker using the given
|
|
configurations. Will attempt to connect using each config in the given
|
|
order, including all compatible resolved IP addresses of the hostname
|
|
supplied in each config, until one is established or all attempts fail.
|
|
|
|
See also `_start_connection_workflow()`.
|
|
|
|
:param sequence connection_configs: A sequence of one or more
|
|
`pika.connection.Parameters`-based objects.
|
|
:param callable on_done: as defined in
|
|
`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
|
|
:param object | None custom_ioloop: Provide a custom I/O loop that is
|
|
native to the specific adapter implementation; if None, the adapter
|
|
will use a default loop instance, which is typically a singleton.
|
|
:param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
|
|
Pass an instance of an implementation of the
|
|
`connection_workflow.AbstractAMQPConnectionWorkflow` interface;
|
|
defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
|
|
with default values for optional args.
|
|
:returns: Connection workflow instance in use. The user should limit
|
|
their interaction with this object only to it's `abort()` method.
|
|
:rtype: connection_workflow.AbstractAMQPConnectionWorkflow
|
|
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def _start_connection_workflow(cls, connection_configs, connection_factory,
|
|
nbio, workflow, on_done):
|
|
"""Helper function for custom implementations of `create_connection()`.
|
|
|
|
:param sequence connection_configs: A sequence of one or more
|
|
`pika.connection.Parameters`-based objects.
|
|
:param callable connection_factory: A function that takes
|
|
`pika.connection.Parameters` as its only arg and returns a brand new
|
|
`pika.connection.Connection`-based adapter instance each time it is
|
|
called. The factory must instantiate the connection with
|
|
`internal_connection_workflow=False`.
|
|
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
|
|
:param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
|
|
Pass an instance of an implementation of the
|
|
`connection_workflow.AbstractAMQPConnectionWorkflow` interface;
|
|
defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
|
|
with default values for optional args.
|
|
:param callable on_done: as defined in
|
|
:py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
|
|
:returns: Connection workflow instance in use. The user should limit
|
|
their interaction with this object only to it's `abort()` method.
|
|
:rtype: connection_workflow.AbstractAMQPConnectionWorkflow
|
|
|
|
"""
|
|
if workflow is None:
|
|
workflow = connection_workflow.AMQPConnectionWorkflow()
|
|
LOGGER.debug('Created default connection workflow %r', workflow)
|
|
|
|
if isinstance(workflow, connection_workflow.AMQPConnectionWorkflow):
|
|
workflow.set_io_services(nbio)
|
|
|
|
def create_connector():
|
|
"""`AMQPConnector` factory."""
|
|
return connection_workflow.AMQPConnector(
|
|
lambda params: _StreamingProtocolShim(
|
|
connection_factory(params)),
|
|
nbio)
|
|
|
|
workflow.start(
|
|
connection_configs=connection_configs,
|
|
connector_factory=create_connector,
|
|
native_loop=nbio.get_native_ioloop(),
|
|
on_done=functools.partial(cls._unshim_connection_workflow_callback,
|
|
on_done))
|
|
|
|
return workflow
|
|
|
|
@property
|
|
def ioloop(self):
|
|
"""
|
|
:returns: the native I/O loop instance underlying async services selected
|
|
by user or the default selected by the specialized connection
|
|
adapter (e.g., Twisted reactor, `asyncio.SelectorEventLoop`,
|
|
`select_connection.IOLoop`, etc.)
|
|
:rtype: object
|
|
"""
|
|
return self._nbio.get_native_ioloop()
|
|
|
|
def _adapter_call_later(self, delay, callback):
|
|
"""Implement
|
|
:py:meth:`pika.connection.Connection._adapter_call_later()`.
|
|
|
|
"""
|
|
return self._nbio.call_later(delay, callback)
|
|
|
|
def _adapter_remove_timeout(self, timeout_id):
|
|
"""Implement
|
|
:py:meth:`pika.connection.Connection._adapter_remove_timeout()`.
|
|
|
|
"""
|
|
timeout_id.cancel()
|
|
|
|
def _adapter_add_callback_threadsafe(self, callback):
|
|
"""Implement
|
|
:py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.
|
|
|
|
"""
|
|
if not callable(callback):
|
|
raise TypeError(
|
|
'callback must be a callable, but got %r' % (callback,))
|
|
|
|
self._nbio.add_callback_threadsafe(callback)
|
|
|
|
def _adapter_connect_stream(self):
|
|
"""Initiate full-stack connection establishment asynchronously for
|
|
internally-initiated connection bring-up.
|
|
|
|
Upon failed completion, we will invoke
|
|
`Connection._on_stream_terminated()`. NOTE: On success,
|
|
the stack will be up already, so there is no corresponding callback.
|
|
|
|
"""
|
|
self._connection_workflow = connection_workflow.AMQPConnectionWorkflow(
|
|
_until_first_amqp_attempt=True)
|
|
|
|
self._connection_workflow.set_io_services(self._nbio)
|
|
|
|
def create_connector():
|
|
"""`AMQPConnector` factory"""
|
|
return connection_workflow.AMQPConnector(
|
|
lambda _params: _StreamingProtocolShim(self), self._nbio)
|
|
|
|
self._connection_workflow.start(
|
|
[self.params],
|
|
connector_factory=create_connector,
|
|
native_loop=self._nbio.get_native_ioloop(),
|
|
on_done=functools.partial(self._unshim_connection_workflow_callback,
|
|
self._on_connection_workflow_done))
|
|
|
|
@staticmethod
|
|
def _unshim_connection_workflow_callback(user_on_done, shim_or_exc):
|
|
"""
|
|
|
|
:param callable user_on_done: user's `on_done` callback as defined in
|
|
:py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
|
|
:param _StreamingProtocolShim | Exception shim_or_exc:
|
|
"""
|
|
result = shim_or_exc
|
|
if isinstance(result, _StreamingProtocolShim):
|
|
result = result.conn
|
|
|
|
user_on_done(result)
|
|
|
|
def _abort_connection_workflow(self):
|
|
"""Asynchronously abort connection workflow. Upon
|
|
completion, `Connection._on_stream_terminated()` will be called with None
|
|
as the error argument.
|
|
|
|
Assumption: may be called only while connection is opening.
|
|
|
|
"""
|
|
assert not self._opened, (
|
|
'_abort_connection_workflow() may be called only when '
|
|
'connection is opening.')
|
|
|
|
if self._transport is None:
|
|
# NOTE: this is possible only when user calls Connection.close() to
|
|
# interrupt internally-initiated connection establishment.
|
|
# self._connection_workflow.abort() would not call
|
|
# Connection.close() before pairing of connection with transport.
|
|
assert self._internal_connection_workflow, (
|
|
'Unexpected _abort_connection_workflow() call with '
|
|
'no transport in external connection workflow mode.')
|
|
|
|
# This will result in call to _on_connection_workflow_done() upon
|
|
# completion
|
|
self._connection_workflow.abort()
|
|
else:
|
|
# NOTE: we can't use self._connection_workflow.abort() in this case,
|
|
# because it would result in infinite recursion as we're called
|
|
# from Connection.close() and _connection_workflow.abort() calls
|
|
# Connection.close() to abort a connection that's already been
|
|
# paired with a transport. During internally-initiated connection
|
|
# establishment, AMQPConnectionWorkflow will discover that user
|
|
# aborted the connection when it receives
|
|
# pika.exceptions.ConnectionOpenAborted.
|
|
|
|
# This completes asynchronously, culminating in call to our method
|
|
# `connection_lost()`
|
|
self._transport.abort()
|
|
|
|
def _on_connection_workflow_done(self, conn_or_exc):
|
|
"""`AMQPConnectionWorkflow` completion callback.
|
|
|
|
:param BaseConnection | Exception conn_or_exc: Our own connection
|
|
instance on success; exception on failure. See
|
|
`AbstractAMQPConnectionWorkflow.start()` for details.
|
|
|
|
"""
|
|
LOGGER.debug('Full-stack connection workflow completed: %r',
|
|
conn_or_exc)
|
|
|
|
self._connection_workflow = None
|
|
|
|
# Notify protocol of failure
|
|
if isinstance(conn_or_exc, Exception):
|
|
self._transport = None
|
|
if isinstance(conn_or_exc,
|
|
connection_workflow.AMQPConnectionWorkflowAborted):
|
|
LOGGER.info('Full-stack connection workflow aborted: %r',
|
|
conn_or_exc)
|
|
# So that _handle_connection_workflow_failure() will know it's
|
|
# not a failure
|
|
conn_or_exc = None
|
|
else:
|
|
LOGGER.error('Full-stack connection workflow failed: %r',
|
|
conn_or_exc)
|
|
if (isinstance(conn_or_exc,
|
|
connection_workflow.AMQPConnectionWorkflowFailed)
|
|
and isinstance(
|
|
conn_or_exc.exceptions[-1], connection_workflow.
|
|
AMQPConnectorSocketConnectError)):
|
|
conn_or_exc = pika.exceptions.AMQPConnectionError(
|
|
conn_or_exc)
|
|
|
|
self._handle_connection_workflow_failure(conn_or_exc)
|
|
else:
|
|
# NOTE: On success, the stack will be up already, so there is no
|
|
# corresponding callback.
|
|
assert conn_or_exc is self, \
|
|
'Expected self conn={!r} from workflow, but got {!r}.'.format(
|
|
self, conn_or_exc)
|
|
|
|
def _handle_connection_workflow_failure(self, error):
|
|
"""Handle failure of self-initiated stack bring-up and call
|
|
`Connection._on_stream_terminated()` if connection is not in closed state
|
|
yet. Called by adapter layer when the full-stack connection workflow
|
|
fails.
|
|
|
|
:param Exception | None error: exception instance describing the reason
|
|
for failure or None if the connection workflow was aborted.
|
|
"""
|
|
if error is None:
|
|
LOGGER.info('Self-initiated stack bring-up aborted.')
|
|
else:
|
|
LOGGER.error('Self-initiated stack bring-up failed: %r', error)
|
|
|
|
if not self.is_closed:
|
|
self._on_stream_terminated(error)
|
|
else:
|
|
# This may happen when AMQP layer bring up was started but did not
|
|
# complete
|
|
LOGGER.debug('_handle_connection_workflow_failure(): '
|
|
'suppressing - connection already closed.')
|
|
|
|
def _adapter_disconnect_stream(self):
|
|
"""Asynchronously bring down the streaming transport layer and invoke
|
|
`Connection._on_stream_terminated()` asynchronously when complete.
|
|
|
|
"""
|
|
if not self._opened:
|
|
self._abort_connection_workflow()
|
|
else:
|
|
# This completes asynchronously, culminating in call to our method
|
|
# `connection_lost()`
|
|
self._transport.abort()
|
|
|
|
def _adapter_emit_data(self, data):
|
|
"""Take ownership of data and send it to AMQP server as soon as
|
|
possible.
|
|
|
|
:param bytes data:
|
|
|
|
"""
|
|
self._transport.write(data)
|
|
|
|
def _proto_connection_made(self, transport):
|
|
"""Introduces transport to protocol after transport is connected.
|
|
|
|
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
|
|
|
:param nbio_interface.AbstractStreamTransport transport:
|
|
:raises Exception: Exception-based exception on error
|
|
|
|
"""
|
|
self._transport = transport
|
|
|
|
# Let connection know that stream is available
|
|
self._on_stream_connected()
|
|
|
|
def _proto_connection_lost(self, error):
|
|
"""Called upon loss or closing of TCP connection.
|
|
|
|
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
|
|
|
NOTE: `connection_made()` and `connection_lost()` are each called just
|
|
once and in that order. All other callbacks are called between them.
|
|
|
|
:param BaseException | None error: An exception (check for
|
|
`BaseException`) indicates connection failure. None indicates that
|
|
connection was closed on this side, such as when it's aborted or
|
|
when `AbstractStreamProtocol.eof_received()` returns a falsy result.
|
|
:raises Exception: Exception-based exception on error
|
|
|
|
"""
|
|
self._transport = None
|
|
|
|
if error is None:
|
|
# Either result of `eof_received()` or abort
|
|
if self._got_eof:
|
|
error = pika.exceptions.StreamLostError(
|
|
'Transport indicated EOF')
|
|
else:
|
|
error = pika.exceptions.StreamLostError(
|
|
'Stream connection lost: {!r}'.format(error))
|
|
|
|
LOGGER.log(logging.DEBUG if error is None else logging.ERROR,
|
|
'connection_lost: %r', error)
|
|
|
|
self._on_stream_terminated(error)
|
|
|
|
def _proto_eof_received(self): # pylint: disable=R0201
|
|
"""Called after the remote peer shuts its write end of the connection.
|
|
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
|
|
|
:returns: A falsy value (including None) will cause the transport to
|
|
close itself, resulting in an eventual `connection_lost()` call
|
|
from the transport. If a truthy value is returned, it will be the
|
|
protocol's responsibility to close/abort the transport.
|
|
:rtype: falsy|truthy
|
|
:raises Exception: Exception-based exception on error
|
|
|
|
"""
|
|
LOGGER.error('Transport indicated EOF.')
|
|
|
|
self._got_eof = True
|
|
|
|
# This is how a reset connection will typically present itself
|
|
# when we have nothing to send to the server over plaintext stream.
|
|
#
|
|
# Have transport tear down the connection and invoke our
|
|
# `connection_lost` method
|
|
return False
|
|
|
|
def _proto_data_received(self, data):
|
|
"""Called to deliver incoming data from the server to the protocol.
|
|
|
|
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
|
|
|
:param data: Non-empty data bytes.
|
|
:raises Exception: Exception-based exception on error
|
|
|
|
"""
|
|
self._on_data_available(data)
|
|
|
|
|
|
class _StreamingProtocolShim(nbio_interface.AbstractStreamProtocol):
|
|
"""Shim for callbacks from transport so that we BaseConnection can
|
|
delegate to private methods, thus avoiding contamination of API with
|
|
methods that look public, but aren't.
|
|
|
|
"""
|
|
|
|
# Override AbstractStreamProtocol abstract methods to enable instantiation
|
|
connection_made = None
|
|
connection_lost = None
|
|
eof_received = None
|
|
data_received = None
|
|
|
|
def __init__(self, conn):
|
|
"""
|
|
:param BaseConnection conn:
|
|
"""
|
|
self.conn = conn
|
|
# pylint: disable=W0212
|
|
self.connection_made = conn._proto_connection_made
|
|
self.connection_lost = conn._proto_connection_lost
|
|
self.eof_received = conn._proto_eof_received
|
|
self.data_received = conn._proto_data_received
|
|
|
|
def __getattr__(self, attr):
|
|
"""Proxy inexistent attribute requests to our connection instance
|
|
so that AMQPConnectionWorkflow/AMQPConnector may treat the shim as an
|
|
actual connection.
|
|
|
|
"""
|
|
return getattr(self.conn, attr)
|
|
|
|
def __repr__(self):
|
|
return '{}: {!r}'.format(self.__class__.__name__, self.conn)
|