diff options
Diffstat (limited to 'google/api_core')
-rw-r--r-- | google/api_core/__init__.py | 23 | ||||
-rw-r--r-- | google/api_core/datetime_helpers.py | 22 | ||||
-rw-r--r-- | google/api_core/exceptions.py | 443 | ||||
-rw-r--r-- | google/api_core/future/__init__.py | 21 | ||||
-rw-r--r-- | google/api_core/future/_helpers.py | 39 | ||||
-rw-r--r-- | google/api_core/future/base.py | 67 | ||||
-rw-r--r-- | google/api_core/future/polling.py | 165 | ||||
-rw-r--r-- | google/api_core/gapic_v1/__init__.py | 21 | ||||
-rw-r--r-- | google/api_core/gapic_v1/config.py | 169 | ||||
-rw-r--r-- | google/api_core/gapic_v1/method.py | 292 | ||||
-rw-r--r-- | google/api_core/general_helpers.py | 32 | ||||
-rw-r--r-- | google/api_core/grpc_helpers.py | 134 | ||||
-rw-r--r-- | google/api_core/operation.py | 297 | ||||
-rw-r--r-- | google/api_core/operations_v1/__init__.py | 21 | ||||
-rw-r--r-- | google/api_core/operations_v1/operations_client.py | 271 | ||||
-rw-r--r-- | google/api_core/operations_v1/operations_client_config.py | 62 | ||||
-rw-r--r-- | google/api_core/page_iterator.py | 522 | ||||
-rw-r--r-- | google/api_core/path_template.py | 198 | ||||
-rw-r--r-- | google/api_core/protobuf_helpers.py | 38 | ||||
-rw-r--r-- | google/api_core/retry.py | 323 | ||||
-rw-r--r-- | google/api_core/timeout.py | 215 |
21 files changed, 3375 insertions, 0 deletions
diff --git a/google/api_core/__init__.py b/google/api_core/__init__.py new file mode 100644 index 0000000..9c1ab88 --- /dev/null +++ b/google/api_core/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google API Core. + +This package contains common code and utilties used by Google client libraries. +""" + +from pkg_resources import get_distribution + + +__version__ = get_distribution('google-api-core').version diff --git a/google/api_core/datetime_helpers.py b/google/api_core/datetime_helpers.py new file mode 100644 index 0000000..cfc817b --- /dev/null +++ b/google/api_core/datetime_helpers.py @@ -0,0 +1,22 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for :mod:`datetime`.""" + +import datetime + + +def utcnow(): + """A :meth:`datetime.datetime.utcnow()` alias to allow mocking in tests.""" + return datetime.datetime.utcnow() diff --git a/google/api_core/exceptions.py b/google/api_core/exceptions.py new file mode 100644 index 0000000..eb1d548 --- /dev/null +++ b/google/api_core/exceptions.py @@ -0,0 +1,443 @@ +# Copyright 2014 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Exceptions raised by Google API core & clients. + +This module provides base classes for all errors raised by libraries based +on :mod:`google.api_core`, including both HTTP and gRPC clients. +""" + +from __future__ import absolute_import +from __future__ import unicode_literals + +import six +from six.moves import http_client + +try: + import grpc +except ImportError: # pragma: NO COVER + grpc = None + +# Lookup tables for mapping exceptions from HTTP and gRPC transports. +# Populated by _APICallErrorMeta +_HTTP_CODE_TO_EXCEPTION = {} +_GRPC_CODE_TO_EXCEPTION = {} + + +class GoogleAPIError(Exception): + """Base class for all exceptions raised by Google API Clients.""" + pass + + +@six.python_2_unicode_compatible +class RetryError(GoogleAPIError): + """Raised when a function has exhausted all of its available retries. + + Args: + message (str): The exception message. + cause (Exception): The last exception raised when retring the + function. + """ + def __init__(self, message, cause): + super(RetryError, self).__init__(message) + self.message = message + self._cause = cause + + @property + def cause(self): + """The last exception raised when retrying the function.""" + return self._cause + + def __str__(self): + return '{}, last exception: {}'.format(self.message, self.cause) + + +class _GoogleAPICallErrorMeta(type): + """Metaclass for registering GoogleAPICallError subclasses.""" + def __new__(mcs, name, bases, class_dict): + cls = type.__new__(mcs, name, bases, class_dict) + if cls.code is not None: + _HTTP_CODE_TO_EXCEPTION.setdefault(cls.code, cls) + if cls.grpc_status_code is not None: + _GRPC_CODE_TO_EXCEPTION.setdefault(cls.grpc_status_code, cls) + return cls + + +@six.python_2_unicode_compatible +@six.add_metaclass(_GoogleAPICallErrorMeta) +class GoogleAPICallError(GoogleAPIError): + """Base class for exceptions raised by calling API methods. + + Args: + message (str): The exception message. + errors (Sequence[Any]): An optional list of error details. + response (Union[requests.Request, grpc.Call]): The response or + gRPC call metadata. + """ + + code = None + """Optional[int]: The HTTP status code associated with this error. + + This may be ``None`` if the exception does not have a direct mapping + to an HTTP error. + + See http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html + """ + + grpc_status_code = None + """Optional[grpc.StatusCode]: The gRPC status code associated with this + error. + + This may be ``None`` if the exception does not match up to a gRPC error. + """ + + def __init__(self, message, errors=(), response=None): + super(GoogleAPICallError, self).__init__(message) + self.message = message + """str: The exception message.""" + self._errors = errors + self._response = response + + def __str__(self): + return '{} {}'.format(self.code, self.message) + + @property + def errors(self): + """Detailed error information. + + Returns: + Sequence[Any]: A list of additional error details. + """ + return list(self._errors) + + @property + def response(self): + """Optional[Union[requests.Request, grpc.Call]]: The response or + gRPC call metadata.""" + return self._response + + +class Redirection(GoogleAPICallError): + """Base class for for all redirection (HTTP 3xx) responses.""" + + +class MovedPermanently(Redirection): + """Exception mapping a ``301 Moved Permanently`` response.""" + code = http_client.MOVED_PERMANENTLY + + +class NotModified(Redirection): + """Exception mapping a ``304 Not Modified`` response.""" + code = http_client.NOT_MODIFIED + + +class TemporaryRedirect(Redirection): + """Exception mapping a ``307 Temporary Redirect`` response.""" + code = http_client.TEMPORARY_REDIRECT + + +class ResumeIncomplete(Redirection): + """Exception mapping a ``308 Resume Incomplete`` response. + + .. note:: :attr:`http_client.PERMANENT_REDIRECT` is ``308``, but Google + APIs differ in their use of this status code. + """ + code = 308 + + +class ClientError(GoogleAPICallError): + """Base class for all client error (HTTP 4xx) responses.""" + + +class BadRequest(ClientError): + """Exception mapping a ``400 Bad Request`` response.""" + code = http_client.BAD_REQUEST + + +class InvalidArgument(BadRequest): + """Exception mapping a :attr:`grpc.StatusCode.INVALID_ARGUMENT` error.""" + grpc_status_code = ( + grpc.StatusCode.INVALID_ARGUMENT if grpc is not None else None) + + +class FailedPrecondition(BadRequest): + """Exception mapping a :attr:`grpc.StatusCode.FAILED_PRECONDITION` + error.""" + grpc_status_code = ( + grpc.StatusCode.FAILED_PRECONDITION if grpc is not None else None) + + +class OutOfRange(BadRequest): + """Exception mapping a :attr:`grpc.StatusCode.OUT_OF_RANGE` error.""" + grpc_status_code = ( + grpc.StatusCode.OUT_OF_RANGE if grpc is not None else None) + + +class Unauthorized(ClientError): + """Exception mapping a ``401 Unauthorized`` response.""" + code = http_client.UNAUTHORIZED + + +class Unauthenticated(Unauthorized): + """Exception mapping a :attr:`grpc.StatusCode.UNAUTHENTICATED` error.""" + grpc_status_code = ( + grpc.StatusCode.UNAUTHENTICATED if grpc is not None else None) + + +class Forbidden(ClientError): + """Exception mapping a ``403 Forbidden`` response.""" + code = http_client.FORBIDDEN + + +class PermissionDenied(Forbidden): + """Exception mapping a :attr:`grpc.StatusCode.PERMISSION_DENIED` error.""" + grpc_status_code = ( + grpc.StatusCode.PERMISSION_DENIED if grpc is not None else None) + + +class NotFound(ClientError): + """Exception mapping a ``404 Not Found`` response or a + :attr:`grpc.StatusCode.NOT_FOUND` error.""" + code = http_client.NOT_FOUND + grpc_status_code = ( + grpc.StatusCode.NOT_FOUND if grpc is not None else None) + + +class MethodNotAllowed(ClientError): + """Exception mapping a ``405 Method Not Allowed`` response.""" + code = http_client.METHOD_NOT_ALLOWED + + +class Conflict(ClientError): + """Exception mapping a ``409 Conflict`` response.""" + code = http_client.CONFLICT + + +class AlreadyExists(Conflict): + """Exception mapping a :attr:`grpc.StatusCode.ALREADY_EXISTS` error.""" + grpc_status_code = ( + grpc.StatusCode.ALREADY_EXISTS if grpc is not None else None) + + +class Aborted(Conflict): + """Exception mapping a :attr:`grpc.StatusCode.ABORTED` error.""" + grpc_status_code = ( + grpc.StatusCode.ABORTED if grpc is not None else None) + + +class LengthRequired(ClientError): + """Exception mapping a ``411 Length Required`` response.""" + code = http_client.LENGTH_REQUIRED + + +class PreconditionFailed(ClientError): + """Exception mapping a ``412 Precondition Failed`` response.""" + code = http_client.PRECONDITION_FAILED + + +class RequestRangeNotSatisfiable(ClientError): + """Exception mapping a ``416 Request Range Not Satisfiable`` response.""" + code = http_client.REQUESTED_RANGE_NOT_SATISFIABLE + + +class TooManyRequests(ClientError): + """Exception mapping a ``429 Too Many Requests`` response.""" + # http_client does not define a constant for this in Python 2. + code = 429 + + +class ResourceExhausted(TooManyRequests): + """Exception mapping a :attr:`grpc.StatusCode.RESOURCE_EXHAUSTED` error.""" + grpc_status_code = ( + grpc.StatusCode.RESOURCE_EXHAUSTED if grpc is not None else None) + + +class Cancelled(ClientError): + """Exception mapping a :attr:`grpc.StatusCode.CANCELLED` error.""" + # This maps to HTTP status code 499. See + # https://github.com/googleapis/googleapis/blob/master/google/rpc\ + # /code.proto + code = 499 + grpc_status_code = grpc.StatusCode.CANCELLED if grpc is not None else None + + +class ServerError(GoogleAPICallError): + """Base for 5xx responses.""" + + +class InternalServerError(ServerError): + """Exception mapping a ``500 Internal Server Error`` response. or a + :attr:`grpc.StatusCode.INTERNAL` error.""" + code = http_client.INTERNAL_SERVER_ERROR + grpc_status_code = grpc.StatusCode.INTERNAL if grpc is not None else None + + +class Unknown(ServerError): + """Exception mapping a :attr:`grpc.StatusCode.UNKNOWN` error.""" + grpc_status_code = grpc.StatusCode.UNKNOWN if grpc is not None else None + + +class DataLoss(ServerError): + """Exception mapping a :attr:`grpc.StatusCode.DATA_LOSS` error.""" + grpc_status_code = grpc.StatusCode.DATA_LOSS if grpc is not None else None + + +class MethodNotImplemented(ServerError): + """Exception mapping a ``501 Not Implemented`` response or a + :attr:`grpc.StatusCode.UNIMPLEMENTED` error.""" + code = http_client.NOT_IMPLEMENTED + grpc_status_code = ( + grpc.StatusCode.UNIMPLEMENTED if grpc is not None else None) + + +class BadGateway(ServerError): + """Exception mapping a ``502 Bad Gateway`` response.""" + code = http_client.BAD_GATEWAY + + +class ServiceUnavailable(ServerError): + """Exception mapping a ``503 Service Unavailable`` response or a + :attr:`grpc.StatusCode.UNAVAILABLE` error.""" + code = http_client.SERVICE_UNAVAILABLE + grpc_status_code = ( + grpc.StatusCode.UNAVAILABLE if grpc is not None else None) + + +class GatewayTimeout(ServerError): + """Exception mapping a ``504 Gateway Timeout`` response.""" + code = http_client.GATEWAY_TIMEOUT + + +class DeadlineExceeded(GatewayTimeout): + """Exception mapping a :attr:`grpc.StatusCode.DEADLINE_EXCEEDED` error.""" + grpc_status_code = ( + grpc.StatusCode.DEADLINE_EXCEEDED if grpc is not None else None) + + +def exception_class_for_http_status(status_code): + """Return the exception class for a specific HTTP status code. + + Args: + status_code (int): The HTTP status code. + + Returns: + :func:`type`: the appropriate subclass of :class:`GoogleAPICallError`. + """ + return _HTTP_CODE_TO_EXCEPTION.get(status_code, GoogleAPICallError) + + +def from_http_status(status_code, message, **kwargs): + """Create a :class:`GoogleAPICallError` from an HTTP status code. + + Args: + status_code (int): The HTTP status code. + message (str): The exception message. + kwargs: Additional arguments passed to the :class:`GoogleAPICallError` + constructor. + + Returns: + GoogleAPICallError: An instance of the appropriate subclass of + :class:`GoogleAPICallError`. + """ + error_class = exception_class_for_http_status(status_code) + error = error_class(message, **kwargs) + + if error.code is None: + error.code = status_code + + return error + + +def from_http_response(response): + """Create a :class:`GoogleAPICallError` from a :class:`requests.Response`. + + Args: + response (requests.Response): The HTTP response. + + Returns: + GoogleAPICallError: An instance of the appropriate subclass of + :class:`GoogleAPICallError`, with the message and errors populated + from the response. + """ + try: + payload = response.json() + except ValueError: + payload = {'error': {'message': response.text or 'unknown error'}} + + error_message = payload.get('error', {}).get('message', 'unknown error') + errors = payload.get('error', {}).get('errors', ()) + + message = '{method} {url}: {error}'.format( + method=response.request.method, + url=response.request.url, + error=error_message) + + exception = from_http_status( + response.status_code, message, errors=errors, response=response) + return exception + + +def exception_class_for_grpc_status(status_code): + """Return the exception class for a specific :class:`grpc.StatusCode`. + + Args: + status_code (grpc.StatusCode): The gRPC status code. + + Returns: + :func:`type`: the appropriate subclass of :class:`GoogleAPICallError`. + """ + return _GRPC_CODE_TO_EXCEPTION.get(status_code, GoogleAPICallError) + + +def from_grpc_status(status_code, message, **kwargs): + """Create a :class:`GoogleAPICallError` from a :class:`grpc.StatusCode`. + + Args: + status_code (grpc.StatusCode): The gRPC status code. + message (str): The exception message. + kwargs: Additional arguments passed to the :class:`GoogleAPICallError` + constructor. + + Returns: + GoogleAPICallError: An instance of the appropriate subclass of + :class:`GoogleAPICallError`. + """ + error_class = exception_class_for_grpc_status(status_code) + error = error_class(message, **kwargs) + + if error.grpc_status_code is None: + error.grpc_status_code = status_code + + return error + + +def from_grpc_error(rpc_exc): + """Create a :class:`GoogleAPICallError` from a :class:`grpc.RpcError`. + + Args: + rpc_exc (grpc.RpcError): The gRPC error. + + Returns: + GoogleAPICallError: An instance of the appropriate subclass of + :class:`GoogleAPICallError`. + """ + if isinstance(rpc_exc, grpc.Call): + return from_grpc_status( + rpc_exc.code(), + rpc_exc.details(), + errors=(rpc_exc,), + response=rpc_exc) + else: + return GoogleAPICallError( + str(rpc_exc), errors=(rpc_exc,), response=rpc_exc) diff --git a/google/api_core/future/__init__.py b/google/api_core/future/__init__.py new file mode 100644 index 0000000..82ab739 --- /dev/null +++ b/google/api_core/future/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2017, Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Futures for dealing with asynchronous operations.""" + +from google.api_core.future.base import Future + +__all__ = [ + 'Future', +] diff --git a/google/api_core/future/_helpers.py b/google/api_core/future/_helpers.py new file mode 100644 index 0000000..933d0b8 --- /dev/null +++ b/google/api_core/future/_helpers.py @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Private helpers for futures.""" + +import logging +import threading + + +_LOGGER = logging.getLogger(__name__) + + +def start_daemon_thread(*args, **kwargs): + """Starts a thread and marks it as a daemon thread.""" + thread = threading.Thread(*args, **kwargs) + thread.daemon = True + thread.start() + return thread + + +def safe_invoke_callback(callback, *args, **kwargs): + """Invoke a callback, swallowing and logging any exceptions.""" + # pylint: disable=bare-except + # We intentionally want to swallow all exceptions. + try: + return callback(*args, **kwargs) + except: + _LOGGER.exception('Error while executing Future callback.') diff --git a/google/api_core/future/base.py b/google/api_core/future/base.py new file mode 100644 index 0000000..2439136 --- /dev/null +++ b/google/api_core/future/base.py @@ -0,0 +1,67 @@ +# Copyright 2017, Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Abstract and helper bases for Future implementations.""" + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class Future(object): + # pylint: disable=missing-docstring + # We inherit the interfaces here from concurrent.futures. + + """Future interface. + + This interface is based on :class:`concurrent.futures.Future`. + """ + + @abc.abstractmethod + def cancel(self): + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + raise NotImplementedError() + + @abc.abstractmethod + def running(self): + raise NotImplementedError() + + @abc.abstractmethod + def done(self): + raise NotImplementedError() + + @abc.abstractmethod + def result(self, timeout=None): + raise NotImplementedError() + + @abc.abstractmethod + def exception(self, timeout=None): + raise NotImplementedError() + + @abc.abstractmethod + def add_done_callback(self, fn): + # pylint: disable=invalid-name + raise NotImplementedError() + + @abc.abstractmethod + def set_result(self, result): + raise NotImplementedError() + + @abc.abstractmethod + def set_exception(self, exception): + raise NotImplementedError() diff --git a/google/api_core/future/polling.py b/google/api_core/future/polling.py new file mode 100644 index 0000000..eea9624 --- /dev/null +++ b/google/api_core/future/polling.py @@ -0,0 +1,165 @@ +# Copyright 2017, Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Abstract and helper bases for Future implementations.""" + +import abc +import concurrent.futures + +from google.api_core import exceptions +from google.api_core import retry +from google.api_core.future import _helpers +from google.api_core.future import base + + +class _OperationNotComplete(Exception): + """Private exception used for polling via retry.""" + pass + + +class PollingFuture(base.Future): + """A Future that needs to poll some service to check its status. + + The :meth:`done` method should be implemented by subclasses. The polling + behavior will repeatedly call ``done`` until it returns True. + + .. note: Privacy here is intended to prevent the final class from + overexposing, not to prevent subclasses from accessing methods. + """ + def __init__(self): + super(PollingFuture, self).__init__() + self._result = None + self._exception = None + self._result_set = False + """bool: Set to True when the result has been set via set_result or + set_exception.""" + self._polling_thread = None + self._done_callbacks = [] + + @abc.abstractmethod + def done(self): + """Checks to see if the operation is complete. + + Returns: + bool: True if the operation is complete, False otherwise. + """ + # pylint: disable=redundant-returns-doc, missing-raises-doc + raise NotImplementedError() + + def _done_or_raise(self): + """Check if the future is done and raise if it's not.""" + if not self.done(): + raise _OperationNotComplete() + + def running(self): + """True if the operation is currently running.""" + return not self.done() + + def _blocking_poll(self, timeout=None): + """Poll and wait for the Future to be resolved. + + Args: + timeout (int): + How long (in seconds) to wait for the operation to complete. + If None, wait indefinitely. + """ + if self._result_set: + return + + retry_ = retry.Retry( + predicate=retry.if_exception_type(_OperationNotComplete), + deadline=timeout) + + try: + retry_(self._done_or_raise)() + except exceptions.RetryError: + raise concurrent.futures.TimeoutError( + 'Operation did not complete within the designated ' + 'timeout.') + + def result(self, timeout=None): + """Get the result of the operation, blocking if necessary. + + Args: + timeout (int): + How long (in seconds) to wait for the operation to complete. + If None, wait indefinitely. + + Returns: + google.protobuf.Message: The Operation's result. + + Raises: + google.gax.GaxError: If the operation errors or if the timeout is + reached before the operation completes. + """ + self._blocking_poll(timeout=timeout) + + if self._exception is not None: + # pylint: disable=raising-bad-type + # Pylint doesn't recognize that this is valid in this case. + raise self._exception + + return self._result + + def exception(self, timeout=None): + """Get the exception from the operation, blocking if necessary. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + + Returns: + Optional[google.gax.GaxError]: The operation's error. + """ + self._blocking_poll() + return self._exception + + def add_done_callback(self, fn): + """Add a callback to be executed when the operation is complete. + + If the operation is not already complete, this will start a helper + thread to poll for the status of the operation in the background. + + Args: + fn (Callable[Future]): The callback to execute when the operation + is complete. + """ + if self._result_set: + _helpers.safe_invoke_callback(fn, self) + return + + self._done_callbacks.append(fn) + + if self._polling_thread is None: + # The polling thread will exit on its own as soon as the operation + # is done. + self._polling_thread = _helpers.start_daemon_thread( + target=self._blocking_poll) + + def _invoke_callbacks(self, *args, **kwargs): + """Invoke all done callbacks.""" + for callback in self._done_callbacks: + _helpers.safe_invoke_callback(callback, *args, **kwargs) + + def set_result(self, result): + """Set the Future's result.""" + self._result = result + self._result_set = True + self._invoke_callbacks(self) + + def set_exception(self, exception): + """Set the Future's exception.""" + self._exception = exception + self._result_set = True + self._invoke_callbacks(self) diff --git a/google/api_core/gapic_v1/__init__.py b/google/api_core/gapic_v1/__init__.py new file mode 100644 index 0000000..7a588cf --- /dev/null +++ b/google/api_core/gapic_v1/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.api_core.gapic_v1 import config +from google.api_core.gapic_v1 import method + +__all__ = [ + 'config', + 'method', +] diff --git a/google/api_core/gapic_v1/config.py b/google/api_core/gapic_v1/config.py new file mode 100644 index 0000000..376e4f9 --- /dev/null +++ b/google/api_core/gapic_v1/config.py @@ -0,0 +1,169 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for loading gapic configuration data. + +The Google API generator creates supplementary configuration for each RPC +method to tell the client library how to deal with retries and timeouts. +""" + +import collections + +import grpc +import six + +from google.api_core import exceptions +from google.api_core import retry +from google.api_core import timeout + + +_MILLIS_PER_SECOND = 1000.0 + + +def _exception_class_for_grpc_status_name(name): + """Returns the Google API exception class for a gRPC error code name. + + Args: + name (str): The name of the gRPC status code, for example, + ``UNAVAILABLE``. + + Returns: + :func:`type`: The appropriate subclass of + :class:`google.api_core.exceptions.GoogleAPICallError`. + """ + return exceptions.exception_class_for_grpc_status( + getattr(grpc.StatusCode, name)) + + +def _retry_from_retry_config(retry_params, retry_codes): + """Creates a Retry object given a gapic retry configuration. + + Args: + retry_params (dict): The retry parameter values, for example:: + + { + "initial_retry_delay_millis": 1000, + "retry_delay_multiplier": 2.5, + "max_retry_delay_millis": 120000, + "initial_rpc_timeout_millis": 120000, + "rpc_timeout_multiplier": 1.0, + "max_rpc_timeout_millis": 120000, + "total_timeout_millis": 600000 + } + + retry_codes (sequence[str]): The list of retryable gRPC error code + names. + + Returns: + google.api_core.retry.Retry: The default retry object for the method. + """ + exception_classes = [ + _exception_class_for_grpc_status_name(code) for code in retry_codes] + return retry.Retry( + retry.if_exception_type(*exception_classes), + initial=( + retry_params['initial_retry_delay_millis'] / _MILLIS_PER_SECOND), + maximum=( + retry_params['max_retry_delay_millis'] / _MILLIS_PER_SECOND), + multiplier=retry_params['retry_delay_multiplier'], + deadline=retry_params['total_timeout_millis'] / _MILLIS_PER_SECOND) + + +def _timeout_from_retry_config(retry_params): + """Creates a ExponentialTimeout object given a gapic retry configuration. + + Args: + retry_params (dict): The retry parameter values, for example:: + + { + "initial_retry_delay_millis": 1000, + "retry_delay_multiplier": 2.5, + "max_retry_delay_millis": 120000, + "initial_rpc_timeout_millis": 120000, + "rpc_timeout_multiplier": 1.0, + "max_rpc_timeout_millis": 120000, + "total_timeout_millis": 600000 + } + + Returns: + google.api_core.retry.ExponentialTimeout: The default time object for + the method. + """ + return timeout.ExponentialTimeout( + initial=( + retry_params['initial_rpc_timeout_millis'] / _MILLIS_PER_SECOND), + maximum=( + retry_params['max_rpc_timeout_millis'] / _MILLIS_PER_SECOND), + multiplier=retry_params['rpc_timeout_multiplier'], + deadline=( + retry_params['total_timeout_millis'] / _MILLIS_PER_SECOND)) + + +MethodConfig = collections.namedtuple('MethodConfig', ['retry', 'timeout']) + + +def parse_method_configs(interface_config): + """Creates default retry and timeout objects for each method in a gapic + interface config. + + Args: + interface_config (Mapping): The interface config section of the full + gapic library config. For example, If the full configuration has + an interface named ``google.example.v1.ExampleService`` you would + pass in just that interface's configuration, for example + ``gapic_config['interfaces']['google.example.v1.ExampleService']``. + + Returns: + Mapping[str, MethodConfig]: A mapping of RPC method names to their + configuration. + """ + # Grab all the retry codes + retry_codes_map = { + name: retry_codes + for name, retry_codes + in six.iteritems(interface_config.get('retry_codes', {})) + } + + # Grab all of the retry params + retry_params_map = { + name: retry_params + for name, retry_params + in six.iteritems(interface_config.get('retry_params', {})) + } + + # Iterate through all the API methods and create a flat MethodConfig + # instance for each one. + method_configs = {} + + for method_name, method_params in six.iteritems( + interface_config.get('methods', {})): + retry_params_name = method_params.get('retry_params_name') + + if retry_params_name is not None: + retry_params = retry_params_map[retry_params_name] + retry_ = _retry_from_retry_config( + retry_params, + retry_codes_map[method_params['retry_codes_name']]) + timeout_ = _timeout_from_retry_config(retry_params) + + # No retry config, so this is a non-retryable method. + else: + retry_ = None + timeout_ = timeout.ConstantTimeout( + method_params['timeout_millis'] / _MILLIS_PER_SECOND) + + method_configs[method_name] = MethodConfig( + retry=retry_, timeout=timeout_) + + return method_configs diff --git a/google/api_core/gapic_v1/method.py b/google/api_core/gapic_v1/method.py new file mode 100644 index 0000000..88e5a57 --- /dev/null +++ b/google/api_core/gapic_v1/method.py @@ -0,0 +1,292 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for wrapping low-level gRPC methods with common functionality. + +This is used by gapic clients to provide common error mapping, retry, timeout, +pagination, and long-running operations to gRPC methods. +""" + +import functools +import platform + +import pkg_resources +import six + +from google.api_core import general_helpers +from google.api_core import grpc_helpers +from google.api_core import page_iterator +from google.api_core import timeout + +_PY_VERSION = platform.python_version() +_GRPC_VERSION = pkg_resources.get_distribution('grpcio').version +_API_CORE_VERSION = pkg_resources.get_distribution('google-api-core').version +METRICS_METADATA_KEY = 'x-goog-api-client' +USE_DEFAULT_METADATA = object() +DEFAULT = object() +"""Sentinel value indicating that a retry or timeout argument was unspecified, +so the default should be used.""" + + +def _is_not_none_or_false(value): + return value is not None and value is not False + + +def _apply_decorators(func, decorators): + """Apply a list of decorators to a given function. + + ``decorators`` may contain items that are ``None`` or ``False`` which will + be ignored. + """ + decorators = filter(_is_not_none_or_false, reversed(decorators)) + + for decorator in decorators: + func = decorator(func) + + return func + + +def _prepare_metadata(metadata): + """Transforms metadata to gRPC format and adds global metrics. + + Args: + metadata (Mapping[str, str]): Any current metadata. + + Returns: + Sequence[Tuple(str, str)]: The gRPC-friendly metadata keys and values. + """ + client_metadata = 'api-core/{} gl-python/{} grpc/{}'.format( + _API_CORE_VERSION, _PY_VERSION, _GRPC_VERSION) + + # Merge this with any existing metric metadata. + if METRICS_METADATA_KEY in metadata: + client_metadata = '{} {}'.format( + client_metadata, metadata[METRICS_METADATA_KEY]) + + metadata[METRICS_METADATA_KEY] = client_metadata + + return list(metadata.items()) + + +def _determine_timeout(default_timeout, specified_timeout, retry): + """Determines how timeout should be applied to a wrapped method. + + Args: + default_timeout (Optional[Timeout]): The default timeout specified + at method creation time. + specified_timeout (Optional[Timeout]): The timeout specified at + invocation time. If :attr:`DEFAULT`, this will be set to + the ``default_timeout``. + retry (Optional[Retry]): The retry specified at invocation time. + + Returns: + Optional[Timeout]: The timeout to apply to the method or ``None``. + """ + if specified_timeout is DEFAULT: + specified_timeout = default_timeout + + if specified_timeout is default_timeout: + # If timeout is the default and the default timeout is exponential and + # a non-default retry is specified, make sure the timeout's deadline + # matches the retry's. This handles the case where the user leaves + # the timeout default but specifies a lower deadline via the retry. + if (retry and retry is not DEFAULT + and isinstance(default_timeout, timeout.ExponentialTimeout)): + return default_timeout.with_deadline(retry._deadline) + else: + return default_timeout + + # If timeout is specified as a number instead of a Timeout instance, + # convert it to a ConstantTimeout. + if isinstance(specified_timeout, (int, float)): + return timeout.ConstantTimeout(specified_timeout) + else: + return specified_timeout + + +class _GapicCallable(object): + """Callable that applies retry, timeout, and metadata logic. + + Args: + target (Callable): The low-level RPC method. + retry (google.api_core.retry.Retry): The default retry for the + callable. If ``None``, this callable will not retry by default + timeout (google.api_core.timeout.Timeout): The default timeout + for the callable. If ``None``, this callable will not specify + a timeout argument to the low-level RPC method by default. + metadata (Optional[Sequence[Tuple[str, str]]]): gRPC call metadata + that's passed to the low-level RPC method. If ``None``, no metadata + will be passed to the low-level RPC method. + """ + + def __init__(self, target, retry, timeout, metadata): + self._target = target + self._retry = retry + self._timeout = timeout + self._metadata = metadata + + def __call__(self, *args, **kwargs): + """Invoke the low-level RPC with retry, timeout, and metadata.""" + # Note: Due to Python 2 lacking keyword-only arguments we use kwargs to + # extract the retry and timeout params. + timeout_ = _determine_timeout( + self._timeout, + kwargs.pop('timeout', self._timeout), + # Use only the invocation-specified retry only for this, as we only + # want to adjust the timeout deadline if the *user* specified + # a different retry. + kwargs.get('retry', None)) + + retry = kwargs.pop('retry', self._retry) + + if retry is DEFAULT: + retry = self._retry + + # Apply all applicable decorators. + wrapped_func = _apply_decorators(self._target, [retry, timeout_]) + + # Set the metadata for the call using the metadata calculated by + # _prepare_metadata. + if self._metadata is not None: + kwargs['metadata'] = self._metadata + + return wrapped_func(*args, **kwargs) + + +def wrap_method( + func, default_retry=None, default_timeout=None, + metadata=USE_DEFAULT_METADATA): + """Wrap an RPC method with common behavior. + + This applies common error wrapping, retry, and timeout behavior a function. + The wrapped function will take optional ``retry`` and ``timeout`` + arguments. + + For example:: + + import google.api_core.gapic_v1.method + from google.api_core import retry + from google.api_core import timeout + + # The original RPC method. + def get_topic(name, timeout=None): + request = publisher_v2.GetTopicRequest(name=name) + return publisher_stub.GetTopic(request, timeout=timeout) + + default_retry = retry.Retry(deadline=60) + default_timeout = timeout.Timeout(deadline=60) + wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method( + get_topic, default_retry) + + # Execute get_topic with default retry and timeout: + response = wrapped_get_topic() + + # Execute get_topic without doing any retying but with the default + # timeout: + response = wrapped_get_topic(retry=None) + + # Execute get_topic but only retry on 5xx errors: + my_retry = retry.Retry(retry.if_exception_type( + exceptions.InternalServerError)) + response = wrapped_get_topic(retry=my_retry) + + The way this works is by late-wrapping the given function with the retry + and timeout decorators. Essentially, when ``wrapped_get_topic()`` is + called: + + * ``get_topic()`` is first wrapped with the ``timeout`` into + ``get_topic_with_timeout``. + * ``get_topic_with_timeout`` is wrapped with the ``retry`` into + ``get_topic_with_timeout_and_retry()``. + * The final ``get_topic_with_timeout_and_retry`` is called passing through + the ``args`` and ``kwargs``. + + The callstack is therefore:: + + method.__call__() -> + Retry.__call__() -> + Timeout.__call__() -> + wrap_errors() -> + get_topic() + + Note that if ``timeout`` or ``retry`` is ``None``, then they are not + applied to the function. For example, + ``wrapped_get_topic(timeout=None, retry=None)`` is more or less + equivalent to just calling ``get_topic`` but with error re-mapping. + + Args: + func (Callable[Any]): The function to wrap. It should accept an + optional ``timeout`` argument. If ``metadata`` is not ``None``, it + should accept a ``metadata`` argument. + default_retry (Optional[google.api_core.Retry]): The default retry + strategy. If ``None``, the method will not retry by default. + default_timeout (Optional[google.api_core.Timeout]): The default + timeout strategy. Can also be specified as an int or float. If + ``None``, the method will not have timeout specified by default. + metadata (Optional(Mapping[str, str])): A dict of metadata keys and + values. This will be augmented with common ``x-google-api-client`` + metadata. If ``None``, metadata will not be passed to the function + at all, if :attr:`USE_DEFAULT_METADATA` (the default) then only the + common metadata will be provided. + + Returns: + Callable: A new callable that takes optional ``retry`` and ``timeout`` + arguments and applies the common error mapping, retry, timeout, + and metadata behavior to the low-level RPC method. + """ + func = grpc_helpers.wrap_errors(func) + + if metadata is USE_DEFAULT_METADATA: + metadata = {} + + if metadata is not None: + metadata = _prepare_metadata(metadata) + + return general_helpers.wraps(func)( + _GapicCallable(func, default_retry, default_timeout, metadata)) + + +def wrap_with_paging( + func, items_field, request_token_field, response_token_field): + """Wrap an RPC method to return a page iterator. + + Args: + func (Callable): The RPC method. This should already have been + wrapped with common functionality using :func:`wrap_method`. + request (protobuf.Message): The request message. + items_field (str): The field in the response message that has the + items for the page. + request_token_field (str): The field in the request message used to + specify the page token. + response_token_field (str): The field in the response message that has + the token for the next page. + + Returns: + Callable: Returns a callable that when invoked will call the RPC + method and return a + :class:`google.api_core.page_iterator.Iterator`. + """ + @six.wraps(func) + def paged_method(request, **kwargs): + """Wrapper that invokes a method and returns a page iterator.""" + iterator = page_iterator.GRPCIterator( + client=None, + method=functools.partial(func, **kwargs), + request=request, + items_field=items_field, + request_token_field=request_token_field, + response_token_field=response_token_field) + return iterator + + return paged_method diff --git a/google/api_core/general_helpers.py b/google/api_core/general_helpers.py new file mode 100644 index 0000000..0c8e408 --- /dev/null +++ b/google/api_core/general_helpers.py @@ -0,0 +1,32 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for general Python functionality.""" + +import functools + +import six + + +# functools.partial objects lack several attributes present on real function +# objects. In Python 2 wraps fails on this so use a restricted set instead. +_PARTIAL_VALID_ASSIGNMENTS = ('__doc__',) + + +def wraps(wrapped): + """A functools.wraps helper that handles partial objects on Python 2.""" + if isinstance(wrapped, functools.partial): + return six.wraps(wrapped, assigned=_PARTIAL_VALID_ASSIGNMENTS) + else: + return six.wraps(wrapped) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py new file mode 100644 index 0000000..e4781db --- /dev/null +++ b/google/api_core/grpc_helpers.py @@ -0,0 +1,134 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for :mod:`grpc`.""" + +import grpc +import six + +from google.api_core import exceptions +from google.api_core import general_helpers +import google.auth +import google.auth.transport.grpc +import google.auth.transport.requests + + +# The list of gRPC Callable interfaces that return iterators. +_STREAM_WRAP_CLASSES = ( + grpc.UnaryStreamMultiCallable, + grpc.StreamStreamMultiCallable, +) + + +def _patch_callable_name(callable_): + """Fix-up gRPC callable attributes. + + gRPC callable lack the ``__name__`` attribute which causes + :func:`functools.wraps` to error. This adds the attribute if needed. + """ + if not hasattr(callable_, '__name__'): + callable_.__name__ = callable_.__class__.__name__ + + +def _wrap_unary_errors(callable_): + """Map errors for Unary-Unary and Stream-Unary gRPC callables.""" + _patch_callable_name(callable_) + + @six.wraps(callable_) + def error_remapped_callable(*args, **kwargs): + try: + return callable_(*args, **kwargs) + except grpc.RpcError as exc: + six.raise_from(exceptions.from_grpc_error(exc), exc) + + return error_remapped_callable + + +def _wrap_stream_errors(callable_): + """Wrap errors for Unary-Stream and Stream-Stream gRPC callables. + + The callables that return iterators require a bit more logic to re-map + errors when iterating. This wraps both the initial invocation and the + iterator of the return value to re-map errors. + """ + _patch_callable_name(callable_) + + @general_helpers.wraps(callable_) + def error_remapped_callable(*args, **kwargs): + try: + result = callable_(*args, **kwargs) + # Note: we are patching the private grpc._channel._Rendezvous._next + # method as magic methods (__next__ in this case) can not be + # patched on a per-instance basis (see + # https://docs.python.org/3/reference/datamodel.html + # #special-lookup). + # In an ideal world, gRPC would return a *specific* interface + # from *StreamMultiCallables, but they return a God class that's + # a combination of basically every interface in gRPC making it + # untenable for us to implement a wrapper object using the same + # interface. + result._next = _wrap_unary_errors(result._next) + return result + except grpc.RpcError as exc: + six.raise_from(exceptions.from_grpc_error(exc), exc) + + return error_remapped_callable + + +def wrap_errors(callable_): + """Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error + classes. + + Errors raised by the gRPC callable are mapped to the appropriate + :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. + The original `grpc.RpcError` (which is usually also a `grpc.Call`) is + available from the ``response`` property on the mapped exception. This + is useful for extracting metadata from the original error. + + Args: + callable_ (Callable): A gRPC callable. + + Returns: + Callable: The wrapped gRPC callable. + """ + if isinstance(callable_, _STREAM_WRAP_CLASSES): + return _wrap_stream_errors(callable_) + else: + return _wrap_unary_errors(callable_) + + +def create_channel(target, credentials=None, scopes=None, **kwargs): + """Create a secure channel with credentials. + + Args: + target (str): The target service address in the format 'hostname:port'. + credentials (google.auth.credentials.Credentials): The credentials. If + not specified, then this function will attempt to ascertain the + credentials from the environment using :func:`google.auth.default`. + scopes (Sequence[str]): A optional list of scopes needed for this + service. These are only used when credentials are not specified and + are passed to :func:`google.auth.default`. + kwargs: Additional key-word args passed to + :func:`google.auth.transport.grpc.secure_authorized_channel`. + + Returns: + grpc.Channel: The created channel. + """ + if credentials is None: + credentials, _ = google.auth.default(scopes=scopes) + + request = google.auth.transport.requests.Request() + + return google.auth.transport.grpc.secure_authorized_channel( + credentials, request, target, **kwargs) diff --git a/google/api_core/operation.py b/google/api_core/operation.py new file mode 100644 index 0000000..2136d95 --- /dev/null +++ b/google/api_core/operation.py @@ -0,0 +1,297 @@ +# Copyright 2016 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Futures for long-running operations returned from Google Cloud APIs. + +These futures can be used to synchronously wait for the result of a +long-running operation using :meth:`Operation.result`: + + +.. code-block:: python + + operation = my_api_client.long_running_method() + result = operation.result() + +Or asynchronously using callbacks and :meth:`Operation.add_done_callback`: + +.. code-block:: python + + operation = my_api_client.long_running_method() + + def my_callback(future): + result = future.result() + + operation.add_done_callback(my_callback) + +""" + +import functools +import threading + +from google.api_core import exceptions +from google.api_core import protobuf_helpers +from google.api_core.future import polling +from google.longrunning import operations_pb2 +from google.protobuf import json_format +from google.rpc import code_pb2 + + +class Operation(polling.PollingFuture): + """A Future for interacting with a Google API Long-Running Operation. + + Args: + operation (google.longrunning.operations_pb2.Operation): The + initial operation. + refresh (Callable[[], Operation]): A callable that returns the + latest state of the operation. + cancel (Callable[[], None]): A callable that tries to cancel + the operation. + result_type (func:`type`): The protobuf type for the operation's + result. + metadata_type (func:`type`): The protobuf type for the operation's + metadata. + """ + + def __init__( + self, operation, refresh, cancel, + result_type, metadata_type=None): + super(Operation, self).__init__() + self._operation = operation + self._refresh = refresh + self._cancel = cancel + self._result_type = result_type + self._metadata_type = metadata_type + self._completion_lock = threading.Lock() + # Invoke this in case the operation came back already complete. + self._set_result_from_operation() + + @property + def operation(self): + """google.longrunning.Operation: The current long-running operation.""" + return self._operation + + @property + def metadata(self): + """google.protobuf.Message: the current operation metadata.""" + if not self._operation.HasField('metadata'): + return None + + return protobuf_helpers.from_any_pb( + self._metadata_type, self._operation.metadata) + + def _set_result_from_operation(self): + """Set the result or exception from the operation if it is complete.""" + # This must be done in a lock to prevent the polling thread + # and main thread from both executing the completion logic + # at the same time. + with self._completion_lock: + # If the operation isn't complete or if the result has already been + # set, do not call set_result/set_exception again. + # Note: self._result_set is set to True in set_result and + # set_exception, in case those methods are invoked directly. + if not self._operation.done or self._result_set: + return + + if self._operation.HasField('response'): + response = protobuf_helpers.from_any_pb( + self._result_type, self._operation.response) + self.set_result(response) + elif self._operation.HasField('error'): + exception = exceptions.GoogleAPICallError( + self._operation.error.message, + errors=(self._operation.error), + response=self._operation) + self.set_exception(exception) + else: + exception = exceptions.GoogleAPICallError( + 'Unexpected state: Long-running operation had neither ' + 'response nor error set.') + self.set_exception(exception) + + def _refresh_and_update(self): + """Refresh the operation and update the result if needed.""" + # If the currently cached operation is done, no need to make another + # RPC as it will not change once done. + if not self._operation.done: + self._operation = self._refresh() + self._set_result_from_operation() + + def done(self): + """Checks to see if the operation is complete. + + Returns: + bool: True if the operation is complete, False otherwise. + """ + self._refresh_and_update() + return self._operation.done + + def cancel(self): + """Attempt to cancel the operation. + + Returns: + bool: True if the cancel RPC was made, False if the operation is + already complete. + """ + if self.done(): + return False + + self._cancel() + return True + + def cancelled(self): + """True if the operation was cancelled.""" + self._refresh_and_update() + return (self._operation.HasField('error') and + self._operation.error.code == code_pb2.CANCELLED) + + +def _refresh_http(api_request, operation_name): + """Refresh an operation using a JSON/HTTP client. + + Args: + api_request (Callable): A callable used to make an API request. This + should generally be + :meth:`google.cloud._http.Connection.api_request`. + operation_name (str): The name of the operation. + + Returns: + google.longrunning.operations_pb2.Operation: The operation. + """ + path = 'operations/{}'.format(operation_name) + api_response = api_request(method='GET', path=path) + return json_format.ParseDict( + api_response, operations_pb2.Operation()) + + +def _cancel_http(api_request, operation_name): + """Cancel an operation using a JSON/HTTP client. + + Args: + api_request (Callable): A callable used to make an API request. This + should generally be + :meth:`google.cloud._http.Connection.api_request`. + operation_name (str): The name of the operation. + """ + path = 'operations/{}:cancel'.format(operation_name) + api_request(method='POST', path=path) + + +def from_http_json(operation, api_request, result_type, **kwargs): + """Create an operation future using a HTTP/JSON client. + + This interacts with the long-running operations `service`_ (specific + to a given API) vis `HTTP/JSON`_. + + .. _HTTP/JSON: https://cloud.google.com/speech/reference/rest/\ + v1beta1/operations#Operation + + Args: + operation (dict): Operation as a dictionary. + api_request (Callable): A callable used to make an API request. This + should generally be + :meth:`google.cloud._http.Connection.api_request`. + result_type (:func:`type`): The protobuf result type. + kwargs: Keyword args passed into the :class:`Operation` constructor. + + Returns: + Operation: The operation future to track the given operation. + """ + operation_proto = json_format.ParseDict( + operation, operations_pb2.Operation()) + refresh = functools.partial( + _refresh_http, api_request, operation_proto.name) + cancel = functools.partial( + _cancel_http, api_request, operation_proto.name) + return Operation(operation_proto, refresh, cancel, result_type, **kwargs) + + +def _refresh_grpc(operations_stub, operation_name): + """Refresh an operation using a gRPC client. + + Args: + operations_stub (google.longrunning.operations_pb2.OperationsStub): + The gRPC operations stub. + operation_name (str): The name of the operation. + + Returns: + google.longrunning.operations_pb2.Operation: The operation. + """ + request_pb = operations_pb2.GetOperationRequest(name=operation_name) + return operations_stub.GetOperation(request_pb) + + +def _cancel_grpc(operations_stub, operation_name): + """Cancel an operation using a gRPC client. + + Args: + operations_stub (google.longrunning.operations_pb2.OperationsStub): + The gRPC operations stub. + operation_name (str): The name of the operation. + """ + request_pb = operations_pb2.CancelOperationRequest(name=operation_name) + operations_stub.CancelOperation(request_pb) + + +def from_grpc(operation, operations_stub, result_type, **kwargs): + """Create an operation future using a gRPC client. + + This interacts with the long-running operations `service`_ (specific + to a given API) via gRPC. + + .. _service: https://github.com/googleapis/googleapis/blob/\ + 050400df0fdb16f63b63e9dee53819044bffc857/\ + google/longrunning/operations.proto#L38 + + Args: + operation (google.longrunning.operations_pb2.Operation): The operation. + operations_stub (google.longrunning.operations_pb2.OperationsStub): + The operations stub. + result_type (:func:`type`): The protobuf result type. + kwargs: Keyword args passed into the :class:`Operation` constructor. + + Returns: + Operation: The operation future to track the given operation. + """ + refresh = functools.partial( + _refresh_grpc, operations_stub, operation.name) + cancel = functools.partial( + _cancel_grpc, operations_stub, operation.name) + return Operation(operation, refresh, cancel, result_type, **kwargs) + + +def from_gapic(operation, operations_client, result_type, **kwargs): + """Create an operation future from a gapic client. + + This interacts with the long-running operations `service`_ (specific + to a given API) via a gapic client. + + .. _service: https://github.com/googleapis/googleapis/blob/\ + 050400df0fdb16f63b63e9dee53819044bffc857/\ + google/longrunning/operations.proto#L38 + + Args: + operation (google.longrunning.operations_pb2.Operation): The operation. + operations_client (google.api_core.operations_v1.OperationsClient): + The operations client. + result_type (:func:`type`): The protobuf result type. + kwargs: Keyword args passed into the :class:`Operation` constructor. + + Returns: + Operation: The operation future to track the given operation. + """ + refresh = functools.partial( + operations_client.get_operation, operation.name) + cancel = functools.partial( + operations_client.cancel_operation, operation.name) + return Operation(operation, refresh, cancel, result_type, **kwargs) diff --git a/google/api_core/operations_v1/__init__.py b/google/api_core/operations_v1/__init__.py new file mode 100644 index 0000000..e9f88f4 --- /dev/null +++ b/google/api_core/operations_v1/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Package for interacting with the google.longrunning.operations meta-API.""" + +from google.api_core.operations_v1.operations_client import OperationsClient + +__all__ = [ + 'OperationsClient' +] diff --git a/google/api_core/operations_v1/operations_client.py b/google/api_core/operations_v1/operations_client.py new file mode 100644 index 0000000..93a823f --- /dev/null +++ b/google/api_core/operations_v1/operations_client.py @@ -0,0 +1,271 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A client for the google.longrunning.operations meta-API. + +This is a client that deals with long-running operations that follow the +pattern outlined by the `Google API Style Guide`_. + +When an API method normally takes long time to complete, it can be designed to +return ``Operation`` to the client, and the client can use this interface to +receive the real response asynchronously by polling the operation resource to +receive the response. + +It is not a separate service, but rather an interface implemented by a larger +service. The protocol-level definition is available at +`google/longrunning/operations.proto`_. Typically, this will be constructed +automatically by another client class to deal with operations. + +.. _Google API Style Guide: + https://cloud.google.com/apis/design/design_pattern + s#long_running_operations +.. _google/longrunning/operations.proto: + https://github.com/googleapis/googleapis/blob/master/google/longrunning + /operations.proto +""" + +from google.api_core import gapic_v1 +from google.api_core.operations_v1 import operations_client_config +from google.longrunning import operations_pb2 + + +class OperationsClient(object): + """Client for interacting with long-running operations within a service. + + Args: + channel (grpc.Channel): The gRPC channel associated with the service + that implements the ``google.longrunning.operations`` interface. + client_config (dict): + A dictionary of call options for each method. If not specified + the default configuration is used. + """ + + def __init__(self, channel, client_config=operations_client_config.config): + # Create the gRPC client stub. + self.operations_stub = operations_pb2.OperationsStub(channel) + + # Create all wrapped methods using the interface configuration. + # The interface config contains all of the default settings for retry + # and timeout for each RPC method. + interfaces = client_config['interfaces'] + interface_config = interfaces['google.longrunning.Operations'] + method_configs = gapic_v1.config.parse_method_configs(interface_config) + + self._get_operation = gapic_v1.method.wrap_method( + self.operations_stub.GetOperation, + default_retry=method_configs['GetOperation'].retry, + default_timeout=method_configs['GetOperation'].timeout) + + self._list_operations = gapic_v1.method.wrap_method( + self.operations_stub.ListOperations, + default_retry=method_configs['ListOperations'].retry, + default_timeout=method_configs['ListOperations'].timeout) + + self._list_operations = gapic_v1.method.wrap_with_paging( + self._list_operations, + 'operations', + 'page_token', + 'next_page_token') + + self._cancel_operation = gapic_v1.method.wrap_method( + self.operations_stub.CancelOperation, + default_retry=method_configs['CancelOperation'].retry, + default_timeout=method_configs['CancelOperation'].timeout) + + self._delete_operation = gapic_v1.method.wrap_method( + self.operations_stub.DeleteOperation, + default_retry=method_configs['DeleteOperation'].retry, + default_timeout=method_configs['DeleteOperation'].timeout) + + # Service calls + def get_operation( + self, name, + retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT): + """Gets the latest state of a long-running operation. + + Clients can use this method to poll the operation result at intervals + as recommended by the API service. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> response = api.get_operation(name) + + Args: + name (str): The name of the operation resource. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Returns: + google.longrunning.operations_pb2.Operation: The state of the + operation. + + Raises: + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + request = operations_pb2.GetOperationRequest(name=name) + return self._get_operation(request, retry=retry, timeout=timeout) + + def list_operations( + self, name, filter_, + retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT): + """ + Lists operations that match the specified filter in the request. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> + >>> # Iterate over all results + >>> for operation in api.list_operations(name): + >>> # process operation + >>> pass + >>> + >>> # Or iterate over results one page at a time + >>> iter = api.list_operations(name) + >>> for page in iter.pages: + >>> for operation in page: + >>> # process operation + >>> pass + + Args: + name (str): The name of the operation collection. + filter_ (str): The standard list filter. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Returns: + google.api_core.page_iterator.Iterator: An iterator that yields + :class:`google.longrunning.operations_pb2.Operation` instances. + + Raises: + google.api_core.exceptions.MethodNotImplemented: If the server + does not support this method. Services are not required to + implement this method. + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + # Create the request object. + request = operations_pb2.ListOperationsRequest( + name=name, filter=filter_) + return self._list_operations(request, retry=retry, timeout=timeout) + + def cancel_operation( + self, name, + retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT): + """Starts asynchronous cancellation on a long-running operation. + + The server makes a best effort to cancel the operation, but success is + not guaranteed. Clients can use :meth:`get_operation` or service- + specific methods to check whether the cancellation succeeded or whether + the operation completed despite cancellation. On successful + cancellation, the operation is not deleted; instead, it becomes an + operation with an ``Operation.error`` value with a + ``google.rpc.Status.code`` of ``1``, corresponding to + ``Code.CANCELLED``. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> api.cancel_operation(name) + + Args: + name (str): The name of the operation resource to be cancelled. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Raises: + google.api_core.exceptions.MethodNotImplemented: If the server + does not support this method. Services are not required to + implement this method. + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + # Create the request object. + request = operations_pb2.CancelOperationRequest(name=name) + self._cancel_operation(request, retry=retry, timeout=timeout) + + def delete_operation( + self, name, + retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT): + """Deletes a long-running operation. + + This method indicates that the client is no longer interested in the + operation result. It does not cancel the operation. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> api.delete_operation(name) + + Args: + name (str): The name of the operation resource to be deleted. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Raises: + google.api_core.exceptions.MethodNotImplemented: If the server + does not support this method. Services are not required to + implement this method. + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + # Create the request object. + request = operations_pb2.DeleteOperationRequest(name=name) + self._delete_operation(request, retry=retry, timeout=timeout) diff --git a/google/api_core/operations_v1/operations_client_config.py b/google/api_core/operations_v1/operations_client_config.py new file mode 100644 index 0000000..bd79fd5 --- /dev/null +++ b/google/api_core/operations_v1/operations_client_config.py @@ -0,0 +1,62 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""gapic configuration for the googe.longrunning.operations client.""" + +config = { + "interfaces": { + "google.longrunning.Operations": { + "retry_codes": { + "idempotent": [ + "DEADLINE_EXCEEDED", + "UNAVAILABLE" + ], + "non_idempotent": [] + }, + "retry_params": { + "default": { + "initial_retry_delay_millis": 100, + "retry_delay_multiplier": 1.3, + "max_retry_delay_millis": 60000, + "initial_rpc_timeout_millis": 20000, + "rpc_timeout_multiplier": 1.0, + "max_rpc_timeout_millis": 600000, + "total_timeout_millis": 600000 + } + }, + "methods": { + "GetOperation": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "ListOperations": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "CancelOperation": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "DeleteOperation": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + } + } + } + } +} diff --git a/google/api_core/page_iterator.py b/google/api_core/page_iterator.py new file mode 100644 index 0000000..c3e5a6d --- /dev/null +++ b/google/api_core/page_iterator.py @@ -0,0 +1,522 @@ +# Copyright 2015 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Iterators for paging through paged API methods. + +These iterators simplify the process of paging through API responses +where the request takes a page token and the response is a list of results with +a token for the next page. See `list pagination`_ in the Google API Style Guide +for more details. + +.. _list pagination: + https://cloud.google.com/apis/design/design_patterns#list_pagination + +API clients that have methods that follow the list pagination pattern can +return an :class:`.Iterator`. You can use this iterator to get **all** of +the results across all pages:: + + >>> results_iterator = client.list_resources() + >>> list(results_iterator) # Convert to a list (consumes all values). + +Or you can walk your way through items and call off the search early if +you find what you're looking for (resulting in possibly fewer requests):: + + >>> for resource in results_iterator: + ... print(resource.name) + ... if not resource.is_valid: + ... break + +At any point, you may check the number of items consumed by referencing the +``num_results`` property of the iterator:: + + >>> for my_item in results_iterator: + ... if results_iterator.num_results >= 10: + ... break + +When iterating, not every new item will send a request to the server. +To iterate based on each page of items (where a page corresponds to +a request):: + + >>> for page in results_iterator.pages: + ... print('=' * 20) + ... print(' Page number: {:d}'.format(iterator.page_number)) + ... print(' Items in page: {:d}'.format(page.num_items)) + ... print(' First item: {!r}'.format(next(page))) + ... print('Items remaining: {:d}'.format(page.remaining)) + ... print('Next page token: {}'.format(iterator.next_page_token)) + ==================== + Page number: 1 + Items in page: 1 + First item: <MyItemClass at 0x7f1d3cccf690> + Items remaining: 0 + Next page token: eav1OzQB0OM8rLdGXOEsyQWSG + ==================== + Page number: 2 + Items in page: 19 + First item: <MyItemClass at 0x7f1d3cccffd0> + Items remaining: 18 + Next page token: None + +Then, for each page you can get all the resources on that page by iterating +through it or using :func:`list`:: + + >>> list(page) + [ + <MyItemClass at 0x7fd64a098ad0>, + <MyItemClass at 0x7fd64a098ed0>, + <MyItemClass at 0x7fd64a098e90>, + ] +""" + +import abc + +import six + + +class Page(object): + """Single page of results in an iterator. + + Args: + parent (google.api_core.page_iterator.Iterator): The iterator that owns + the current page. + items (Sequence[Any]): An iterable (that also defines __len__) of items + from a raw API response. + item_to_value (Callable[google.api_core.page_iterator.Iterator, Any]): + Callable to convert an item from the type in the raw API response + into the native object. Will be called with the iterator and a + single item. + """ + + def __init__(self, parent, items, item_to_value): + self._parent = parent + self._num_items = len(items) + self._remaining = self._num_items + self._item_iter = iter(items) + self._item_to_value = item_to_value + + @property + def num_items(self): + """int: Total items in the page.""" + return self._num_items + + @property + def remaining(self): + """int: Remaining items in the page.""" + return self._remaining + + def __iter__(self): + """The :class:`Page` is an iterator of items.""" + return self + + def next(self): + """Get the next value in the page.""" + item = six.next(self._item_iter) + result = self._item_to_value(self._parent, item) + # Since we've successfully got the next value from the + # iterator, we update the number of remaining. + self._remaining -= 1 + return result + + # Alias needed for Python 2/3 support. + __next__ = next + + +def _item_to_value_identity(iterator, item): + """An item to value transformer that returns the item un-changed.""" + # pylint: disable=unused-argument + # We are conforming to the interface defined by Iterator. + return item + + +@six.add_metaclass(abc.ABCMeta) +class Iterator(object): + """A generic class for iterating through API list responses. + + Args: + client(google.cloud.client.Client): The API client. + item_to_value (Callable[google.api_core.page_iterator.Iterator, Any]): + Callable to convert an item from the type in the raw API response + into the native object. Will be called with the iterator and a + single item. + page_token (str): A token identifying a page in a result set to start + fetching results from. + max_results (int): The maximum number of results to fetch. + """ + + def __init__(self, client, item_to_value=_item_to_value_identity, + page_token=None, max_results=None): + self._started = False + self.client = client + self._item_to_value = item_to_value + self.max_results = max_results + # The attributes below will change over the life of the iterator. + self.page_number = 0 + self.next_page_token = page_token + self.num_results = 0 + + @property + def pages(self): + """Iterator of pages in the response. + + returns: + types.GeneratorType[google.api_core.page_iterator.Page]: A + generator of page instances. + + raises: + ValueError: If the iterator has already been started. + """ + if self._started: + raise ValueError('Iterator has already started', self) + self._started = True + return self._page_iter(increment=True) + + def _items_iter(self): + """Iterator for each item returned.""" + for page in self._page_iter(increment=False): + for item in page: + self.num_results += 1 + yield item + + def __iter__(self): + """Iterator for each item returned. + + Returns: + types.GeneratorType[Any]: A generator of items from the API. + + Raises: + ValueError: If the iterator has already been started. + """ + if self._started: + raise ValueError('Iterator has already started', self) + self._started = True + return self._items_iter() + + def _page_iter(self, increment): + """Generator of pages of API responses. + + Args: + increment (bool): Flag indicating if the total number of results + should be incremented on each page. This is useful since a page + iterator will want to increment by results per page while an + items iterator will want to increment per item. + + Yields: + Page: each page of items from the API. + """ + page = self._next_page() + while page is not None: + self.page_number += 1 + if increment: + self.num_results += page.num_items + yield page + page = self._next_page() + + @abc.abstractmethod + def _next_page(self): + """Get the next page in the iterator. + + This does nothing and is intended to be over-ridden by subclasses + to return the next :class:`Page`. + + Raises: + NotImplementedError: Always, this method is abstract. + """ + raise NotImplementedError + + +def _do_nothing_page_start(iterator, page, response): + """Helper to provide custom behavior after a :class:`Page` is started. + + This is a do-nothing stand-in as the default value. + + Args: + iterator (Iterator): An iterator that holds some request info. + page (Page): The page that was just created. + response (Any): The API response for a page. + """ + # pylint: disable=unused-argument + pass + + +class HTTPIterator(Iterator): + """A generic class for iterating through HTTP/JSON API list responses. + + To make an iterator work, you'll need to provide a way to convert a JSON + item returned from the API into the object of your choice (via + ``item_to_value``). You also may need to specify a custom ``items_key`` so + that a given response (containing a page of results) can be parsed into an + iterable page of the actual objects you want. + + Args: + client (google.cloud.client.Client): The API client. + api_request (Callable): The function to use to make API requests. + Generally, this will be + :meth:`google.cloud._http.JSONConnection.api_request`. + path (str): The method path to query for the list of items. + item_to_value (Callable[google.api_core.page_iterator.Iterator, Any]): + Callable to convert an item from the type in the JSON response into + a native object. Will be called with the iterator and a single + item. + items_key (str): The key in the API response where the list of items + can be found. + page_token (str): A token identifying a page in a result set to start + fetching results from. + max_results (int): The maximum number of results to fetch. + extra_params (dict): Extra query string parameters for the + API call. + page_start (Callable[ + google.api_core.page_iterator.Iterator, + google.api_core.page_iterator.Page, dict]): Callable to provide + any special behavior after a new page has been created. Assumed + signature takes the :class:`.Iterator` that started the page, + the :class:`.Page` that was started and the dictionary containing + the page response. + next_token (str): The name of the field used in the response for page + tokens. + + .. autoattribute:: pages + """ + + _DEFAULT_ITEMS_KEY = 'items' + _PAGE_TOKEN = 'pageToken' + _MAX_RESULTS = 'maxResults' + _NEXT_TOKEN = 'nextPageToken' + _RESERVED_PARAMS = frozenset([_PAGE_TOKEN]) + _HTTP_METHOD = 'GET' + + def __init__(self, client, api_request, path, item_to_value, + items_key=_DEFAULT_ITEMS_KEY, + page_token=None, max_results=None, extra_params=None, + page_start=_do_nothing_page_start, next_token=_NEXT_TOKEN): + super(HTTPIterator, self).__init__( + client, item_to_value, page_token=page_token, + max_results=max_results) + self.api_request = api_request + self.path = path + self._items_key = items_key + self.extra_params = extra_params + self._page_start = page_start + self._next_token = next_token + # Verify inputs / provide defaults. + if self.extra_params is None: + self.extra_params = {} + self._verify_params() + + def _verify_params(self): + """Verifies the parameters don't use any reserved parameter. + + Raises: + ValueError: If a reserved parameter is used. + """ + reserved_in_use = self._RESERVED_PARAMS.intersection( + self.extra_params) + if reserved_in_use: + raise ValueError('Using a reserved parameter', + reserved_in_use) + + def _next_page(self): + """Get the next page in the iterator. + + Returns: + Optional[Page]: The next page in the iterator or :data:`None` if + there are no pages left. + """ + if self._has_next_page(): + response = self._get_next_page_response() + items = response.get(self._items_key, ()) + page = Page(self, items, self._item_to_value) + self._page_start(self, page, response) + self.next_page_token = response.get(self._next_token) + return page + else: + return None + + def _has_next_page(self): + """Determines whether or not there are more pages with results. + + Returns: + bool: Whether the iterator has more pages. + """ + if self.page_number == 0: + return True + + if self.max_results is not None: + if self.num_results >= self.max_results: + return False + + return self.next_page_token is not None + + def _get_query_params(self): + """Getter for query parameters for the next request. + + Returns: + dict: A dictionary of query parameters. + """ + result = {} + if self.next_page_token is not None: + result[self._PAGE_TOKEN] = self.next_page_token + if self.max_results is not None: + result[self._MAX_RESULTS] = self.max_results - self.num_results + result.update(self.extra_params) + return result + + def _get_next_page_response(self): + """Requests the next page from the path provided. + + Returns: + dict: The parsed JSON response of the next page's contents. + + Raises: + ValueError: If the HTTP method is not ``GET`` or ``POST``. + """ + params = self._get_query_params() + if self._HTTP_METHOD == 'GET': + return self.api_request( + method=self._HTTP_METHOD, + path=self.path, + query_params=params) + elif self._HTTP_METHOD == 'POST': + return self.api_request( + method=self._HTTP_METHOD, + path=self.path, + data=params) + else: + raise ValueError('Unexpected HTTP method', self._HTTP_METHOD) + + +class _GAXIterator(Iterator): + """A generic class for iterating through Cloud gRPC APIs list responses. + + Any: + client (google.cloud.client.Client): The API client. + page_iter (google.gax.PageIterator): A GAX page iterator to be wrapped + to conform to the :class:`Iterator` interface. + item_to_value (Callable[Iterator, Any]): Callable to convert an item + from the the protobuf response into a native object. Will + be called with the iterator and a single item. + max_results (int): The maximum number of results to fetch. + + .. autoattribute:: pages + """ + + def __init__(self, client, page_iter, item_to_value, max_results=None): + super(_GAXIterator, self).__init__( + client, item_to_value, page_token=page_iter.page_token, + max_results=max_results) + self._gax_page_iter = page_iter + + def _next_page(self): + """Get the next page in the iterator. + + Wraps the response from the :class:`~google.gax.PageIterator` in a + :class:`Page` instance and captures some state at each page. + + Returns: + Optional[Page]: The next page in the iterator or :data:`None` if + there are no pages left. + """ + try: + items = six.next(self._gax_page_iter) + page = Page(self, items, self._item_to_value) + self.next_page_token = self._gax_page_iter.page_token or None + return page + except StopIteration: + return None + + +class GRPCIterator(Iterator): + """A generic class for iterating through gRPC list responses. + + .. note:: The class does not take a ``page_token`` argument because it can + just be specified in the ``request``. + + Args: + client (google.cloud.client.Client): The API client. This unused by + this class, but kept to satisfy the :class:`Iterator` interface. + method (Callable[protobuf.Message]): A bound gRPC method that should + take a single message for the request. + request (protobuf.Message): The request message. + items_field (str): The field in the response message that has the + items for the page. + item_to_value (Callable[GRPCIterator, Any]): Callable to convert an + item from the type in the JSON response into a native object. Will + be called with the iterator and a single item. + request_token_field (str): The field in the request message used to + specify the page token. + response_token_field (str): The field in the response message that has + the token for the next page. + max_results (int): The maximum number of results to fetch. + + .. autoattribute:: pages + """ + + _DEFAULT_REQUEST_TOKEN_FIELD = 'page_token' + _DEFAULT_RESPONSE_TOKEN_FIELD = 'next_page_token' + + def __init__( + self, + client, + method, + request, + items_field, + item_to_value=_item_to_value_identity, + request_token_field=_DEFAULT_REQUEST_TOKEN_FIELD, + response_token_field=_DEFAULT_RESPONSE_TOKEN_FIELD, + max_results=None): + super(GRPCIterator, self).__init__( + client, item_to_value, max_results=max_results) + self._method = method + self._request = request + self._items_field = items_field + self._request_token_field = request_token_field + self._response_token_field = response_token_field + + def _next_page(self): + """Get the next page in the iterator. + + Returns: + Page: The next page in the iterator or :data:`None` if + there are no pages left. + """ + if not self._has_next_page(): + return None + + if self.next_page_token is not None: + setattr( + self._request, self._request_token_field, self.next_page_token) + + response = self._method(self._request) + + self.next_page_token = getattr(response, self._response_token_field) + items = getattr(response, self._items_field) + page = Page(self, items, self._item_to_value) + + return page + + def _has_next_page(self): + """Determines whether or not there are more pages with results. + + Returns: + bool: Whether the iterator has more pages. + """ + if self.page_number == 0: + return True + + if self.max_results is not None: + if self.num_results >= self.max_results: + return False + + # Note: intentionally a falsy check instead of a None check. The RPC + # can return an empty string indicating no more pages. + return True if self.next_page_token else False diff --git a/google/api_core/path_template.py b/google/api_core/path_template.py new file mode 100644 index 0000000..e1cfae3 --- /dev/null +++ b/google/api_core/path_template.py @@ -0,0 +1,198 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Expand and validate URL path templates. + +This module provides the :func:`expand` and :func:`validate` functions for +interacting with Google-style URL `path templates`_ which are commonly used +in Google APIs for `resource names`_. + +.. _path templates: https://github.com/googleapis/googleapis/blob + /57e2d376ac7ef48681554204a3ba78a414f2c533/google/api/http.proto#L212 +.. _resource names: https://cloud.google.com/apis/design/resource_names +""" + +from __future__ import unicode_literals + +import functools +import re + +import six + +# Regular expression for extracting variable parts from a path template. +# The variables can be expressed as: +# +# - "*": a single-segment positional variable, for example: "books/*" +# - "**": a multi-segment positional variable, for example: "shelf/**/book/*" +# - "{name}": a single-segment wildcard named variable, for example +# "books/{name}" +# - "{name=*}: same as above. +# - "{name=**}": a multi-segment wildcard named variable, for example +# "shelf/{name=**}" +# - "{name=/path/*/**}": a multi-segment named variable with a sub-template. +_VARIABLE_RE = re.compile(r""" + ( # Capture the entire variable expression + (?P<positional>\*\*?) # Match & capture * and ** positional variables. + | + # Match & capture named variables {name} + { + (?P<name>[^/]+?) + # Optionally match and capture the named variable's template. + (?:=(?P<template>.+?))? + } + ) + """, re.VERBOSE) + +# Segment expressions used for validating paths against a template. +_SINGLE_SEGMENT_PATTERN = r'([^/]+)' +_MULTI_SEGMENT_PATTERN = r'(.+)' + + +def _expand_variable_match(positional_vars, named_vars, match): + """Expand a matched variable with its value. + + Args: + positional_vars (list): A list of positonal variables. This list will + be modified. + named_vars (dict): A dictionary of named variables. + match (re.Match): A regular expression match. + + Returns: + str: The expanded variable to replace the match. + + Raises: + ValueError: If a positional or named variable is required by the + template but not specified or if an unexpected template expression + is encountered. + """ + positional = match.group('positional') + name = match.group('name') + if name is not None: + try: + return six.text_type(named_vars[name]) + except KeyError: + raise ValueError( + 'Named variable \'{}\' not specified and needed by template ' + '`{}` at position {}'.format( + name, match.string, match.start())) + elif positional is not None: + try: + return six.text_type(positional_vars.pop(0)) + except IndexError: + raise ValueError( + 'Positional variable not specified and needed by template ' + '`{}` at position {}'.format( + match.string, match.start())) + else: + raise ValueError( + 'Unknown template expression {}'.format( + match.group(0))) + + +def expand(tmpl, *args, **kwargs): + """Expand a path template with the given variables. + + ..code-block:: python + + >>> expand('users/*/messages/*', 'me', '123') + users/me/messages/123 + >>> expand('/v1/{name=shelves/*/books/*}', name='shelves/1/books/3') + /v1/shelves/1/books/3 + + Args: + tmpl (str): The path template. + args: The positional variables for the path. + kwargs: The named variables for the path. + + Returns: + str: The expanded path + + Raises: + ValueError: If a positional or named variable is required by the + template but not specified or if an unexpected template expression + is encountered. + """ + replacer = functools.partial(_expand_variable_match, list(args), kwargs) + return _VARIABLE_RE.sub(replacer, tmpl) + + +def _replace_variable_with_pattern(match): + """Replace a variable match with a pattern that can be used to validate it. + + Args: + match (re.Match): A regular expression match + + Returns: + str: A regular expression pattern that can be used to validate the + variable in an expanded path. + + Raises: + ValueError: If an unexpected template expression is encountered. + """ + positional = match.group('positional') + name = match.group('name') + template = match.group('template') + if name is not None: + if not template: + return _SINGLE_SEGMENT_PATTERN.format(name) + elif template == '**': + return _MULTI_SEGMENT_PATTERN.format(name) + else: + return _generate_pattern_for_template(template) + elif positional == '*': + return _SINGLE_SEGMENT_PATTERN + elif positional == '**': + return _MULTI_SEGMENT_PATTERN + else: + raise ValueError( + 'Unknown template expression {}'.format( + match.group(0))) + + +def _generate_pattern_for_template(tmpl): + """Generate a pattern that can validate a path template. + + Args: + tmpl (str): The path template + + Returns: + str: A regular expression pattern that can be used to validate an + expanded path template. + """ + return _VARIABLE_RE.sub(_replace_variable_with_pattern, tmpl) + + +def validate(tmpl, path): + """Validate a path against the path template. + + .. code-block:: python + + >>> validate('users/*/messages/*', 'users/me/messages/123') + True + >>> validate('users/*/messages/*', 'users/me/drafts/123') + False + >>> validate('/v1/{name=shelves/*/books/*}', /v1/shelves/1/books/3) + True + >>> validate('/v1/{name=shelves/*/books/*}', /v1/shelves/1/tapes/3) + False + + Args: + tmpl (str): The path template. + path (str): The expanded path. + + Returns: + bool: True if the path matches. + """ + pattern = _generate_pattern_for_template(tmpl) + '$' + return True if re.match(pattern, path) is not None else False diff --git a/google/api_core/protobuf_helpers.py b/google/api_core/protobuf_helpers.py new file mode 100644 index 0000000..35eb574 --- /dev/null +++ b/google/api_core/protobuf_helpers.py @@ -0,0 +1,38 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for :mod:`protobuf`.""" + + +def from_any_pb(pb_type, any_pb): + """Converts an ``Any`` protobuf to the specified message type. + + Args: + pb_type (type): the type of the message that any_pb stores an instance + of. + any_pb (google.protobuf.any_pb2.Any): the object to be converted. + + Returns: + pb_type: An instance of the pb_type message. + + Raises: + TypeError: if the message could not be converted. + """ + msg = pb_type() + if not any_pb.Unpack(msg): + raise TypeError( + 'Could not convert {} to {}'.format( + any_pb.__class__.__name__, pb_type.__name__)) + + return msg diff --git a/google/api_core/retry.py b/google/api_core/retry.py new file mode 100644 index 0000000..ccbb883 --- /dev/null +++ b/google/api_core/retry.py @@ -0,0 +1,323 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for retrying functions with exponential back-off. + +The :class:`Retry` decorator can be used to retry functions that raise +exceptions using exponential backoff. Because a exponential sleep algorithm is +used, the retry is limited by a `deadline`. The deadline is the maxmimum amount +of time a method can block. This is used instead of total number of retries +because it is difficult to ascertain the amount of time a function can block +when using total number of retries and exponential backoff. + +By default, this decorator will retry transient +API errors (see :func:`if_transient_error`). For example: + +.. code-block:: python + + @retry.Retry() + def call_flaky_rpc(): + return client.flaky_rpc() + + # Will retry flaky_rpc() if it raises transient API errors. + result = call_flaky_rpc() + +You can pass a custom predicate to retry on different exceptions, such as +waiting for an eventually consistent item to be available: + +.. code-block:: python + + @retry.Retry(predicate=if_exception_type(exceptions.NotFound)) + def check_if_exists(): + return client.does_thing_exist() + + is_available = check_if_exists() + +Some client library methods apply retry automatically. These methods can accept +a ``retry`` parameter that allows you to configure the behavior: + +.. code-block:: python + + my_retry = retry.Retry(deadline=60) + result = client.some_method(retry=my_retry) + +""" + +from __future__ import unicode_literals + +import datetime +import functools +import logging +import random +import time + +import six + +from google.api_core import datetime_helpers +from google.api_core import exceptions +from google.api_core import general_helpers + +_LOGGER = logging.getLogger(__name__) +_DEFAULT_INITIAL_DELAY = 1.0 # seconds +_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds +_DEFAULT_DELAY_MULTIPLIER = 2.0 +_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds + + +def if_exception_type(*exception_types): + """Creates a predicate to check if the exception is of a given type. + + Args: + exception_types (Sequence[:func:`type`]): The exception types to check + for. + + Returns: + Callable[Exception]: A predicate that returns True if the provided + exception is of the given type(s). + """ + def if_exception_type_predicate(exception): + """Bound predicate for checking an exception type.""" + return isinstance(exception, exception_types) + return if_exception_type_predicate + + +# pylint: disable=invalid-name +# Pylint sees this as a constant, but it is also an alias that should be +# considered a function. +if_transient_error = if_exception_type(( + exceptions.InternalServerError, + exceptions.TooManyRequests)) +"""A predicate that checks if an exception is a transient API error. + +The following server errors are considered transient: + +- :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC + ``INTERNAL(13)`` and its subclasses. +- :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429 +- :class:`google.api_core.exceptions.ResourceExhausted` - gRPC + ``RESOURCE_EXHAUSTED(8)`` +""" +# pylint: enable=invalid-name + + +def exponential_sleep_generator( + initial, maximum, multiplier=_DEFAULT_DELAY_MULTIPLIER): + """Generates sleep intervals based on the exponential back-off algorithm. + + This implements the `Truncated Exponential Back-off`_ algorithm. + + .. _Truncated Exponential Back-off: + https://cloud.google.com/storage/docs/exponential-backoff + + Args: + initial (float): The minimum about of time to delay. This must + be greater than 0. + maximum (float): The maximum about of time to delay. + multiplier (float): The multiplier applied to the delay. + + Yields: + float: successive sleep intervals. + """ + delay = initial + while True: + # Introduce jitter by yielding a delay that is uniformly distributed + # to average out to the delay time. + yield min(random.uniform(0.0, delay * 2.0), maximum) + delay = delay * multiplier + + +def retry_target(target, predicate, sleep_generator, deadline, on_error=None): + """Call a function and retry if it fails. + + This is the lowest-level retry helper. Generally, you'll use the + higher-level retry helper :class:`Retry`. + + Args: + target(Callable): The function to call and retry. This must be a + nullary function - apply arguments with `functools.partial`. + predicate (Callable[Exception]): A callable used to determine if an + exception raised by the target should be considered retryable. + It should return True to retry or False otherwise. + sleep_generator (Iterable[float]): An infinite iterator that determines + how long to sleep between retries. + deadline (float): How long to keep retrying the target. + on_error (Callable): A function to call while processing a retryable + exception. Any error raised by this function will *not* be + caught. + + Returns: + Any: the return value of the target function. + + Raises: + google.api_core.RetryError: If the deadline is exceeded while retrying. + ValueError: If the sleep generator stops yielding values. + Exception: If the target raises a method that isn't retryable. + """ + if deadline is not None: + deadline_datetime = ( + datetime_helpers.utcnow() + datetime.timedelta(seconds=deadline)) + else: + deadline_datetime = None + + last_exc = None + + for sleep in sleep_generator: + try: + return target() + + # pylint: disable=broad-except + # This function explicitly must deal with broad exceptions. + except Exception as exc: + if not predicate(exc): + raise + last_exc = exc + if on_error is not None: + on_error(exc) + + now = datetime_helpers.utcnow() + if deadline_datetime is not None and deadline_datetime < now: + six.raise_from( + exceptions.RetryError( + 'Deadline of {:.1f}s exceeded while calling {}'.format( + deadline, target), + last_exc), + last_exc) + + _LOGGER.debug('Retrying due to {}, sleeping {:.1f}s ...'.format( + last_exc, sleep)) + time.sleep(sleep) + + raise ValueError('Sleep generator stopped yielding sleep values.') + + +@six.python_2_unicode_compatible +class Retry(object): + """Exponential retry decorator. + + This class is a decorator used to add exponential back-off retry behavior + to an RPC call. + + Although the default behavior is to retry transient API errors, a + different predicate can be provided to retry other exceptions. + + Args: + predicate (Callable[Exception]): A callable that should return ``True`` + if the given exception is retryable. + initial (float): The minimum about of time to delay in seconds. This + must be greater than 0. + maximum (float): The maximum about of time to delay in seconds. + multiplier (float): The multiplier applied to the delay. + deadline (float): How long to keep retrying in seconds. + """ + def __init__( + self, + predicate=if_transient_error, + initial=_DEFAULT_INITIAL_DELAY, + maximum=_DEFAULT_MAXIMUM_DELAY, + multiplier=_DEFAULT_DELAY_MULTIPLIER, + deadline=_DEFAULT_DEADLINE): + self._predicate = predicate + self._initial = initial + self._multiplier = multiplier + self._maximum = maximum + self._deadline = deadline + + def __call__(self, func, on_error=None): + """Wrap a callable with retry behavior. + + Args: + func (Callable): The callable to add retry behavior to. + on_error (Callable): A function to call while processing a + retryable exception. Any error raised by this function will + *not* be caught. + + Returns: + Callable: A callable that will invoke ``func`` with retry + behavior. + """ + @general_helpers.wraps(func) + def retry_wrapped_func(*args, **kwargs): + """A wrapper that calls target function with retry.""" + target = functools.partial(func, *args, **kwargs) + sleep_generator = exponential_sleep_generator( + self._initial, self._maximum, multiplier=self._multiplier) + return retry_target( + target, + self._predicate, + sleep_generator, + self._deadline, + on_error=on_error, + ) + + return retry_wrapped_func + + def with_deadline(self, deadline): + """Return a copy of this retry with the given deadline. + + Args: + deadline (float): How long to keep retrying. + + Returns: + Retry: A new retry instance with the given deadline. + """ + return Retry( + predicate=self._predicate, + initial=self._initial, + maximum=self._maximum, + multiplier=self._multiplier, + deadline=deadline) + + def with_predicate(self, predicate): + """Return a copy of this retry with the given predicate. + + Args: + predicate (Callable[Exception]): A callable that should return + ``True`` if the given exception is retryable. + + Returns: + Retry: A new retry instance with the given predicate. + """ + return Retry( + predicate=predicate, + initial=self._initial, + maximum=self._maximum, + multiplier=self._multiplier, + deadline=self._deadline) + + def with_delay( + self, initial=None, maximum=None, multiplier=None): + """Return a copy of this retry with the given delay options. + + Args: + initial (float): The minimum about of time to delay. This must + be greater than 0. + maximum (float): The maximum about of time to delay. + multiplier (float): The multiplier applied to the delay. + + Returns: + Retry: A new retry instance with the given predicate. + """ + return Retry( + predicate=self._predicate, + initial=initial if initial is not None else self._initial, + maximum=maximum if maximum is not None else self._maximum, + multiplier=multiplier if maximum is not None else self._multiplier, + deadline=self._deadline) + + def __str__(self): + return ( + '<Retry predicate={}, initial={:.1f}, maximum={:.1f}, ' + 'multiplier={:.1f}, deadline={:.1f}>'.format( + self._predicate, self._initial, self._maximum, + self._multiplier, self._deadline)) diff --git a/google/api_core/timeout.py b/google/api_core/timeout.py new file mode 100644 index 0000000..12fb9fc --- /dev/null +++ b/google/api_core/timeout.py @@ -0,0 +1,215 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Decorators for applying timeout arguments to functions. + +These decorators are used to wrap API methods to apply either a constant +or exponential timeout argument. + +For example, imagine an API method that can take a while to return results, +such as one that might block until a resource is ready: + +.. code-block:: python + + def is_thing_ready(timeout=None): + response = requests.get('https://example.com/is_thing_ready') + response.raise_for_status() + return response.json() + +This module allows a function like this to be wrapped so that timeouts are +automatically determined, for example: + +.. code-block:: python + + timeout_ = timeout.ExponentialTimeout() + is_thing_ready_with_timeout = timeout_(is_thing_ready) + + for n in range(10): + try: + is_thing_ready_with_timeout({'example': 'data'}) + except: + pass + +In this example the first call to ``is_thing_ready`` will have a relatively +small timeout (like 1 second). If the resource is available and the request +completes quickly, the loop exits. But, if the resource isn't yet available +and the request times out, it'll be retried - this time with a larger timeout. + +In the broader context these decorators are typically combined with +:mod:`google.api_core.retry` to implement API methods with a signature that +matches ``api_method(request, timeout=None, retry=None)``. +""" + +from __future__ import unicode_literals + +import datetime + +import six + +from google.api_core import datetime_helpers +from google.api_core import general_helpers + +_DEFAULT_INITIAL_TIMEOUT = 5.0 # seconds +_DEFAULT_MAXIMUM_TIMEOUT = 30.0 # seconds +_DEFAULT_TIMEOUT_MULTIPLIER = 2.0 +# If specified, must be in seconds. If none, deadline is not used in the +# timeout calculation. +_DEFAULT_DEADLINE = None + + +@six.python_2_unicode_compatible +class ConstantTimeout(object): + """A decorator that adds a constant timeout argument. + + This is effectively equivalent to + ``functools.partial(func, timeout=timeout)``. + + Args: + timeout (Optional[float]): the timeout (in seconds) to applied to the + wrapped function. If `None`, the target function is expected to + never timeout. + """ + def __init__(self, timeout=None): + self._timeout = timeout + + def __call__(self, func): + """Apply the timeout decorator. + + Args: + func (Callable): The function to apply the timeout argument to. + This function must accept a timeout keyword argument. + + Returns: + Callable: The wrapped function. + """ + @general_helpers.wraps(func) + def func_with_timeout(*args, **kwargs): + """Wrapped function that adds timeout.""" + kwargs['timeout'] = self._timeout + return func(*args, **kwargs) + return func_with_timeout + + def __str__(self): + return '<ConstantTimeout timeout={:.1f}>'.format(self._timeout) + + +def _exponential_timeout_generator(initial, maximum, multiplier, deadline): + """A generator that yields exponential timeout values. + + Args: + initial (float): The initial timeout. + maximum (float): The maximum timeout. + multiplier (float): The multiplier applied to the timeout. + deadline (float): The overall deadline across all invocations. + + Yields: + float: A timeout value. + """ + if deadline is not None: + deadline_datetime = ( + datetime_helpers.utcnow() + + datetime.timedelta(seconds=deadline)) + else: + deadline_datetime = datetime.datetime.max + + timeout = initial + while True: + now = datetime_helpers.utcnow() + yield min( + # The calculated timeout based on invocations. + timeout, + # The set maximum timeout. + maximum, + # The remaining time before the deadline is reached. + float((deadline_datetime - now).seconds)) + timeout = timeout * multiplier + + +@six.python_2_unicode_compatible +class ExponentialTimeout(object): + """A decorator that adds an exponentially increasing timeout argument. + + This is useful if a function is called multiple times. Each time the + function is called this decorator will calculate a new timeout parameter + based on the the number of times the function has been called. + + For example + + .. code-block:: python + + Args: + initial (float): The initial timeout to pass. + maximum (float): The maximum timeout for any one call. + multiplier (float): The multiplier applied to the timeout for each + invocation. + deadline (Optional[float]): The overall deadline across all + invocations. This is used to prevent a very large calculated + timeout from pushing the overall execution time over the deadline. + This is especially useful in conjuction with + :mod:`google.api_core.retry`. If ``None``, the timeouts will not + be adjusted to accomodate an overall deadline. + """ + def __init__( + self, + initial=_DEFAULT_INITIAL_TIMEOUT, + maximum=_DEFAULT_MAXIMUM_TIMEOUT, + multiplier=_DEFAULT_TIMEOUT_MULTIPLIER, + deadline=_DEFAULT_DEADLINE): + self._initial = initial + self._maximum = maximum + self._multiplier = multiplier + self._deadline = deadline + + def with_deadline(self, deadline): + """Return a copy of this teimout with the given deadline. + + Args: + deadline (float): The overall deadline across all invocations. + + Returns: + ExponentialTimeout: A new instance with the given deadline. + """ + return ExponentialTimeout( + initial=self._initial, + maximum=self._maximum, + multiplier=self._multiplier, + deadline=deadline) + + def __call__(self, func): + """Apply the timeout decorator. + + Args: + func (Callable): The function to apply the timeout argument to. + This function must accept a timeout keyword argument. + + Returns: + Callable: The wrapped function. + """ + timeouts = _exponential_timeout_generator( + self._initial, self._maximum, self._multiplier, self._deadline) + + @general_helpers.wraps(func) + def func_with_timeout(*args, **kwargs): + """Wrapped function that adds timeout.""" + kwargs['timeout'] = next(timeouts) + return func(*args, **kwargs) + + return func_with_timeout + + def __str__(self): + return ( + '<ExponentialTimeout initial={:.1f}, maximum={:.1f}, ' + 'multiplier={:.1f}, deadline={:.1f}>'.format( + self._initial, self._maximum, self._multiplier, + self._deadline)) |