aboutsummaryrefslogtreecommitdiff
path: root/google/api_core/operations_v1/abstract_operations_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'google/api_core/operations_v1/abstract_operations_client.py')
-rw-r--r--google/api_core/operations_v1/abstract_operations_client.py564
1 files changed, 564 insertions, 0 deletions
diff --git a/google/api_core/operations_v1/abstract_operations_client.py b/google/api_core/operations_v1/abstract_operations_client.py
new file mode 100644
index 0000000..631094e
--- /dev/null
+++ b/google/api_core/operations_v1/abstract_operations_client.py
@@ -0,0 +1,564 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from collections import OrderedDict
+from distutils import util
+import os
+import re
+from typing import Dict, Optional, Sequence, Tuple, Type, Union
+
+from google.api_core import client_options as client_options_lib # type: ignore
+from google.api_core import gapic_v1 # type: ignore
+from google.api_core import retry as retries # type: ignore
+from google.api_core.operations_v1 import pagers
+from google.api_core.operations_v1.transports.base import (
+ DEFAULT_CLIENT_INFO,
+ OperationsTransport,
+)
+from google.api_core.operations_v1.transports.rest import OperationsRestTransport
+from google.auth import credentials as ga_credentials # type: ignore
+from google.auth.exceptions import MutualTLSChannelError # type: ignore
+from google.auth.transport import mtls # type: ignore
+from google.longrunning import operations_pb2
+from google.oauth2 import service_account # type: ignore
+
+OptionalRetry = Union[retries.Retry, object]
+
+
+class AbstractOperationsClientMeta(type):
+ """Metaclass for the Operations client.
+
+ This provides class-level methods for building and retrieving
+ support objects (e.g. transport) without polluting the client instance
+ objects.
+ """
+
+ _transport_registry = OrderedDict() # type: Dict[str, Type[OperationsTransport]]
+ _transport_registry["rest"] = OperationsRestTransport
+
+ def get_transport_class(
+ cls, label: Optional[str] = None,
+ ) -> Type[OperationsTransport]:
+ """Returns an appropriate transport class.
+
+ Args:
+ label: The name of the desired transport. If none is
+ provided, then the first transport in the registry is used.
+
+ Returns:
+ The transport class to use.
+ """
+ # If a specific transport is requested, return that one.
+ if label:
+ return cls._transport_registry[label]
+
+ # No transport is requested; return the default (that is, the first one
+ # in the dictionary).
+ return next(iter(cls._transport_registry.values()))
+
+
+class AbstractOperationsClient(metaclass=AbstractOperationsClientMeta):
+ """Manages long-running operations with an API service.
+
+ When an API method normally takes long time to complete, it can be
+ designed to return [Operation][google.api_core.operations_v1.Operation] to the
+ client, and the client can use this interface to receive the real
+ response asynchronously by polling the operation resource, or pass
+ the operation resource to another API (such as Google Cloud Pub/Sub
+ API) to receive the response. Any API service that returns
+ long-running operations should implement the ``Operations``
+ interface so developers can have a consistent client experience.
+ """
+
+ @staticmethod
+ def _get_default_mtls_endpoint(api_endpoint):
+ """Converts api endpoint to mTLS endpoint.
+
+ Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
+ "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
+ Args:
+ api_endpoint (Optional[str]): the api endpoint to convert.
+ Returns:
+ str: converted mTLS api endpoint.
+ """
+ if not api_endpoint:
+ return api_endpoint
+
+ mtls_endpoint_re = re.compile(
+ r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
+ )
+
+ m = mtls_endpoint_re.match(api_endpoint)
+ name, mtls, sandbox, googledomain = m.groups()
+ if mtls or not googledomain:
+ return api_endpoint
+
+ if sandbox:
+ return api_endpoint.replace(
+ "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
+ )
+
+ return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
+
+ DEFAULT_ENDPOINT = "longrunning.googleapis.com"
+ DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
+ DEFAULT_ENDPOINT
+ )
+
+ @classmethod
+ def from_service_account_info(cls, info: dict, *args, **kwargs):
+ """Creates an instance of this client using the provided credentials
+ info.
+
+ Args:
+ info (dict): The service account private key info.
+ args: Additional arguments to pass to the constructor.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ AbstractOperationsClient: The constructed client.
+ """
+ credentials = service_account.Credentials.from_service_account_info(info)
+ kwargs["credentials"] = credentials
+ return cls(*args, **kwargs)
+
+ @classmethod
+ def from_service_account_file(cls, filename: str, *args, **kwargs):
+ """Creates an instance of this client using the provided credentials
+ file.
+
+ Args:
+ filename (str): The path to the service account private key json
+ file.
+ args: Additional arguments to pass to the constructor.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ AbstractOperationsClient: The constructed client.
+ """
+ credentials = service_account.Credentials.from_service_account_file(filename)
+ kwargs["credentials"] = credentials
+ return cls(*args, **kwargs)
+
+ from_service_account_json = from_service_account_file
+
+ @property
+ def transport(self) -> OperationsTransport:
+ """Returns the transport used by the client instance.
+
+ Returns:
+ OperationsTransport: The transport used by the client
+ instance.
+ """
+ return self._transport
+
+ @staticmethod
+ def common_billing_account_path(billing_account: str,) -> str:
+ """Returns a fully-qualified billing_account string."""
+ return "billingAccounts/{billing_account}".format(
+ billing_account=billing_account,
+ )
+
+ @staticmethod
+ def parse_common_billing_account_path(path: str) -> Dict[str, str]:
+ """Parse a billing_account path into its component segments."""
+ m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
+ return m.groupdict() if m else {}
+
+ @staticmethod
+ def common_folder_path(folder: str,) -> str:
+ """Returns a fully-qualified folder string."""
+ return "folders/{folder}".format(folder=folder,)
+
+ @staticmethod
+ def parse_common_folder_path(path: str) -> Dict[str, str]:
+ """Parse a folder path into its component segments."""
+ m = re.match(r"^folders/(?P<folder>.+?)$", path)
+ return m.groupdict() if m else {}
+
+ @staticmethod
+ def common_organization_path(organization: str,) -> str:
+ """Returns a fully-qualified organization string."""
+ return "organizations/{organization}".format(organization=organization,)
+
+ @staticmethod
+ def parse_common_organization_path(path: str) -> Dict[str, str]:
+ """Parse a organization path into its component segments."""
+ m = re.match(r"^organizations/(?P<organization>.+?)$", path)
+ return m.groupdict() if m else {}
+
+ @staticmethod
+ def common_project_path(project: str,) -> str:
+ """Returns a fully-qualified project string."""
+ return "projects/{project}".format(project=project,)
+
+ @staticmethod
+ def parse_common_project_path(path: str) -> Dict[str, str]:
+ """Parse a project path into its component segments."""
+ m = re.match(r"^projects/(?P<project>.+?)$", path)
+ return m.groupdict() if m else {}
+
+ @staticmethod
+ def common_location_path(project: str, location: str,) -> str:
+ """Returns a fully-qualified location string."""
+ return "projects/{project}/locations/{location}".format(
+ project=project, location=location,
+ )
+
+ @staticmethod
+ def parse_common_location_path(path: str) -> Dict[str, str]:
+ """Parse a location path into its component segments."""
+ m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
+ return m.groupdict() if m else {}
+
+ def __init__(
+ self,
+ *,
+ credentials: Optional[ga_credentials.Credentials] = None,
+ transport: Union[str, OperationsTransport, None] = None,
+ client_options: Optional[client_options_lib.ClientOptions] = None,
+ client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
+ ) -> None:
+ """Instantiates the operations client.
+
+ Args:
+ credentials (Optional[google.auth.credentials.Credentials]): The
+ authorization credentials to attach to requests. These
+ credentials identify the application to the service; if none
+ are specified, the client will attempt to ascertain the
+ credentials from the environment.
+ transport (Union[str, OperationsTransport]): The
+ transport to use. If set to None, a transport is chosen
+ automatically.
+ client_options (google.api_core.client_options.ClientOptions): Custom options for the
+ client. It won't take effect if a ``transport`` instance is provided.
+ (1) The ``api_endpoint`` property can be used to override the
+ default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT
+ environment variable can also be used to override the endpoint:
+ "always" (always use the default mTLS endpoint), "never" (always
+ use the default regular endpoint) and "auto" (auto switch to the
+ default mTLS endpoint if client certificate is present, this is
+ the default value). However, the ``api_endpoint`` property takes
+ precedence if provided.
+ (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
+ is "true", then the ``client_cert_source`` property can be used
+ to provide client certificate for mutual TLS transport. If
+ not provided, the default SSL client certificate will be used if
+ present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
+ set, no client certificate will be used.
+ client_info (google.api_core.gapic_v1.client_info.ClientInfo):
+ The client info used to send a user-agent string along with
+ API requests. If ``None``, then default info will be used.
+ Generally, you only need to set this if you're developing
+ your own client library.
+
+ Raises:
+ google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
+ creation failed for any reason.
+ """
+ if isinstance(client_options, dict):
+ client_options = client_options_lib.from_dict(client_options)
+ if client_options is None:
+ client_options = client_options_lib.ClientOptions()
+
+ # Create SSL credentials for mutual TLS if needed.
+ use_client_cert = bool(
+ util.strtobool(os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"))
+ )
+
+ client_cert_source_func = None
+ is_mtls = False
+ if use_client_cert:
+ if client_options.client_cert_source:
+ is_mtls = True
+ client_cert_source_func = client_options.client_cert_source
+ else:
+ is_mtls = mtls.has_default_client_cert_source()
+ if is_mtls:
+ client_cert_source_func = mtls.default_client_cert_source()
+ else:
+ client_cert_source_func = None
+
+ # Figure out which api endpoint to use.
+ if client_options.api_endpoint is not None:
+ api_endpoint = client_options.api_endpoint
+ else:
+ use_mtls_env = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
+ if use_mtls_env == "never":
+ api_endpoint = self.DEFAULT_ENDPOINT
+ elif use_mtls_env == "always":
+ api_endpoint = self.DEFAULT_MTLS_ENDPOINT
+ elif use_mtls_env == "auto":
+ if is_mtls:
+ api_endpoint = self.DEFAULT_MTLS_ENDPOINT
+ else:
+ api_endpoint = self.DEFAULT_ENDPOINT
+ else:
+ raise MutualTLSChannelError(
+ "Unsupported GOOGLE_API_USE_MTLS_ENDPOINT value. Accepted "
+ "values: never, auto, always"
+ )
+
+ # Save or instantiate the transport.
+ # Ordinarily, we provide the transport, but allowing a custom transport
+ # instance provides an extensibility point for unusual situations.
+ if isinstance(transport, OperationsTransport):
+ # transport is a OperationsTransport instance.
+ if credentials or client_options.credentials_file:
+ raise ValueError(
+ "When providing a transport instance, "
+ "provide its credentials directly."
+ )
+ if client_options.scopes:
+ raise ValueError(
+ "When providing a transport instance, provide its scopes "
+ "directly."
+ )
+ self._transport = transport
+ else:
+ Transport = type(self).get_transport_class(transport)
+ self._transport = Transport(
+ credentials=credentials,
+ credentials_file=client_options.credentials_file,
+ host=api_endpoint,
+ scopes=client_options.scopes,
+ client_cert_source_for_mtls=client_cert_source_func,
+ quota_project_id=client_options.quota_project_id,
+ client_info=client_info,
+ always_use_jwt_access=True,
+ )
+
+ def list_operations(
+ self,
+ name: str,
+ filter_: Optional[str] = None,
+ *,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ retry: OptionalRetry = gapic_v1.method.DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> pagers.ListOperationsPager:
+ r"""Lists operations that match the specified filter in the request.
+ If the server doesn't support this method, it returns
+ ``UNIMPLEMENTED``.
+
+ NOTE: the ``name`` binding allows API services to override the
+ binding to use different resource name schemes, such as
+ ``users/*/operations``. To override the binding, API services
+ can add a binding such as ``"/v1/{name=users/*}/operations"`` to
+ their service configuration. For backwards compatibility, the
+ default name includes the operations collection id, however
+ overriding users must ensure the name binding is the parent
+ resource, without the operations collection id.
+
+ Args:
+ name (str):
+ The name of the operation's parent
+ resource.
+ filter_ (str):
+ The standard list filter.
+ This corresponds to the ``filter`` field
+ on the ``request`` instance; if ``request`` is provided, this
+ should not be set.
+ retry (google.api_core.retry.Retry): Designation of what errors, if any,
+ should be retried.
+ timeout (float): The timeout for this request.
+ metadata (Sequence[Tuple[str, str]]): Strings which should be
+ sent along with the request as metadata.
+
+ Returns:
+ google.api_core.operations_v1.pagers.ListOperationsPager:
+ The response message for
+ [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
+
+ Iterating over this object will yield results and
+ resolve additional pages automatically.
+
+ """
+ # Create a protobuf request object.
+ request = operations_pb2.ListOperationsRequest(name=name, filter=filter_)
+ if page_size is not None:
+ request.page_size = page_size
+ if page_token is not None:
+ request.page_token = page_token
+
+ # Wrap the RPC method; this adds retry and timeout information,
+ # and friendly error handling.
+ rpc = self._transport._wrapped_methods[self._transport.list_operations]
+
+ # Certain fields should be provided within the metadata header;
+ # add these here.
+ metadata = tuple(metadata or ()) + (
+ gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
+ )
+
+ # Send the request.
+ response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
+
+ # This method is paged; wrap the response in a pager, which provides
+ # an `__iter__` convenience method.
+ response = pagers.ListOperationsPager(
+ method=rpc, request=request, response=response, metadata=metadata,
+ )
+
+ # Done; return the response.
+ return response
+
+ def get_operation(
+ self,
+ name: str,
+ *,
+ retry: OptionalRetry = gapic_v1.method.DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> operations_pb2.Operation:
+ r"""Gets the latest state of a long-running operation.
+ Clients can use this method to poll the operation result
+ at intervals as recommended by the API service.
+
+ Args:
+ name (str):
+ The name of the operation resource.
+ retry (google.api_core.retry.Retry): Designation of what errors, if any,
+ should be retried.
+ timeout (float): The timeout for this request.
+ metadata (Sequence[Tuple[str, str]]): Strings which should be
+ sent along with the request as metadata.
+
+ Returns:
+ google.longrunning.operations_pb2.Operation:
+ This resource represents a long-
+ unning operation that is the result of a
+ network API call.
+
+ """
+
+ request = operations_pb2.GetOperationRequest(name=name)
+
+ # Wrap the RPC method; this adds retry and timeout information,
+ # and friendly error handling.
+ rpc = self._transport._wrapped_methods[self._transport.get_operation]
+
+ # Certain fields should be provided within the metadata header;
+ # add these here.
+ metadata = tuple(metadata or ()) + (
+ gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
+ )
+
+ # Send the request.
+ response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
+
+ # Done; return the response.
+ return response
+
+ def delete_operation(
+ self,
+ name: str,
+ *,
+ retry: OptionalRetry = gapic_v1.method.DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> None:
+ r"""Deletes a long-running operation. This method indicates that the
+ client is no longer interested in the operation result. It does
+ not cancel the operation. If the server doesn't support this
+ method, it returns ``google.rpc.Code.UNIMPLEMENTED``.
+
+ Args:
+ name (str):
+ The name of the operation resource to
+ be deleted.
+
+ This corresponds to the ``name`` field
+ on the ``request`` instance; if ``request`` is provided, this
+ should not be set.
+ retry (google.api_core.retry.Retry): Designation of what errors, if any,
+ should be retried.
+ timeout (float): The timeout for this request.
+ metadata (Sequence[Tuple[str, str]]): Strings which should be
+ sent along with the request as metadata.
+ """
+ # Create the request object.
+ request = operations_pb2.DeleteOperationRequest(name=name)
+
+ # Wrap the RPC method; this adds retry and timeout information,
+ # and friendly error handling.
+ rpc = self._transport._wrapped_methods[self._transport.delete_operation]
+
+ # Certain fields should be provided within the metadata header;
+ # add these here.
+ metadata = tuple(metadata or ()) + (
+ gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
+ )
+
+ # Send the request.
+ rpc(
+ request, retry=retry, timeout=timeout, metadata=metadata,
+ )
+
+ def cancel_operation(
+ self,
+ name: Optional[str] = None,
+ *,
+ retry: OptionalRetry = gapic_v1.method.DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> None:
+ r"""Starts asynchronous cancellation on a long-running operation.
+ The server makes a best effort to cancel the operation, but
+ success is not guaranteed. If the server doesn't support this
+ method, it returns ``google.rpc.Code.UNIMPLEMENTED``. Clients
+ can use
+ [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]
+ or other methods to check whether the cancellation succeeded or
+ whether the operation completed despite cancellation. On
+ successful cancellation, the operation is not deleted; instead,
+ it becomes an operation with an
+ [Operation.error][google.api_core.operations_v1.Operation.error] value with
+ a [google.rpc.Status.code][google.rpc.Status.code] of 1,
+ corresponding to ``Code.CANCELLED``.
+
+ Args:
+ name (str):
+ The name of the operation resource to
+ be cancelled.
+
+ This corresponds to the ``name`` field
+ on the ``request`` instance; if ``request`` is provided, this
+ should not be set.
+ retry (google.api_core.retry.Retry): Designation of what errors, if any,
+ should be retried.
+ timeout (float): The timeout for this request.
+ metadata (Sequence[Tuple[str, str]]): Strings which should be
+ sent along with the request as metadata.
+ """
+ # Create the request object.
+ request = operations_pb2.CancelOperationRequest(name=name)
+
+ # Wrap the RPC method; this adds retry and timeout information,
+ # and friendly error handling.
+ rpc = self._transport._wrapped_methods[self._transport.cancel_operation]
+
+ # Certain fields should be provided within the metadata header;
+ # add these here.
+ metadata = tuple(metadata or ()) + (
+ gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
+ )
+
+ # Send the request.
+ rpc(
+ request, retry=retry, timeout=timeout, metadata=metadata,
+ )