diff options
Diffstat (limited to 'google/api_core/operations_v1/transports/rest.py')
-rw-r--r-- | google/api_core/operations_v1/transports/rest.py | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/google/api_core/operations_v1/transports/rest.py b/google/api_core/operations_v1/transports/rest.py new file mode 100644 index 0000000..27ed766 --- /dev/null +++ b/google/api_core/operations_v1/transports/rest.py @@ -0,0 +1,455 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Callable, Dict, Optional, Sequence, Tuple, Union + +from requests import __version__ as requests_version + +from google.api_core import exceptions as core_exceptions # type: ignore +from google.api_core import gapic_v1 # type: ignore +from google.api_core import path_template # type: ignore +from google.api_core import rest_helpers # type: ignore +from google.api_core import retry as retries # type: ignore +from google.auth import credentials as ga_credentials # type: ignore +from google.auth.transport.requests import AuthorizedSession # type: ignore +from google.longrunning import operations_pb2 # type: ignore +from google.protobuf import empty_pb2 # type: ignore +from google.protobuf import json_format # type: ignore +from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport + +OptionalRetry = Union[retries.Retry, object] + +DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( + gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, + grpc_version=None, + rest_version=requests_version, +) + + +class OperationsRestTransport(OperationsTransport): + """REST backend transport for Operations. + + Manages long-running operations with an API service. + + When an API method normally takes long time to complete, it can be + designed to return [Operation][google.api_core.operations_v1.Operation] to the + client, and the client can use this interface to receive the real + response asynchronously by polling the operation resource, or pass + the operation resource to another API (such as Google Cloud Pub/Sub + API) to receive the response. Any API service that returns + long-running operations should implement the ``Operations`` + interface so developers can have a consistent client experience. + + This class defines the same methods as the primary client, so the + primary client can load the underlying transport implementation + and call it. + + It sends JSON representations of protocol buffers over HTTP/1.1 + """ + + def __init__( + self, + *, + host: str = "longrunning.googleapis.com", + credentials: ga_credentials.Credentials = None, + credentials_file: Optional[str] = None, + scopes: Optional[Sequence[str]] = None, + client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, + quota_project_id: Optional[str] = None, + client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, + url_scheme: str = "https", + http_options: Optional[Dict] = None, + ) -> None: + """Instantiate the transport. + + Args: + host (Optional[str]): + The hostname to connect to. + credentials (Optional[google.auth.credentials.Credentials]): The + authorization credentials to attach to requests. These + credentials identify the application to the service; if none + are specified, the client will attempt to ascertain the + credentials from the environment. + + credentials_file (Optional[str]): A file with credentials that can + be loaded with :func:`google.auth.load_credentials_from_file`. + This argument is ignored if ``channel`` is provided. + scopes (Optional(Sequence[str])): A list of scopes. This argument is + ignored if ``channel`` is provided. + client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client + certificate to configure mutual TLS HTTP channel. It is ignored + if ``channel`` is provided. + quota_project_id (Optional[str]): An optional project to use for billing + and quota. + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing + your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. + url_scheme: the protocol scheme for the API endpoint. Normally + "https", but for testing or local servers, + "http" can be specified. + http_options: a dictionary of http_options for transcoding, to override + the defaults from operatons.proto. Each method has an entry + with the corresponding http rules as value. + + """ + # Run the base constructor + # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. + # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the + # credentials object + super().__init__( + host=host, + credentials=credentials, + client_info=client_info, + always_use_jwt_access=always_use_jwt_access, + ) + self._session = AuthorizedSession( + self._credentials, default_host=self.DEFAULT_HOST + ) + if client_cert_source_for_mtls: + self._session.configure_mtls_channel(client_cert_source_for_mtls) + self._prep_wrapped_messages(client_info) + self._http_options = http_options or {} + + def _list_operations( + self, + request: operations_pb2.ListOperationsRequest, + *, + retry: OptionalRetry = gapic_v1.method.DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> operations_pb2.ListOperationsResponse: + r"""Call the list operations method over HTTP. + + Args: + request (~.operations_pb2.ListOperationsRequest): + The request object. The request message for + [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. + + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + ~.operations_pb2.ListOperationsResponse: + The response message for + [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. + + """ + + http_options = [ + {"method": "get", "uri": "/v1/{name=operations}"}, + ] + if "google.longrunning.Operations.ListOperations" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.ListOperations" + ] + + request_kwargs = json_format.MessageToDict( + request, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.ListOperationsRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + including_default_value_fields=False, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + response = getattr(self._session, method)( + "https://{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + ) + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + raise core_exceptions.from_http_response(response) + + # Return the response + api_response = operations_pb2.ListOperationsResponse() + json_format.Parse(response.content, api_response, ignore_unknown_fields=False) + return api_response + + def _get_operation( + self, + request: operations_pb2.GetOperationRequest, + *, + retry: OptionalRetry = gapic_v1.method.DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> operations_pb2.Operation: + r"""Call the get operation method over HTTP. + + Args: + request (~.operations_pb2.GetOperationRequest): + The request object. The request message for + [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. + + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + ~.operations_pb2.Operation: + This resource represents a long- + unning operation that is the result of a + network API call. + + """ + + http_options = [ + {"method": "get", "uri": "/v1/{name=operations/**}"}, + ] + if "google.longrunning.Operations.GetOperation" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.GetOperation" + ] + + request_kwargs = json_format.MessageToDict( + request, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.GetOperationRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + including_default_value_fields=False, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + response = getattr(self._session, method)( + "https://{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + ) + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + raise core_exceptions.from_http_response(response) + + # Return the response + api_response = operations_pb2.Operation() + json_format.Parse(response.content, api_response, ignore_unknown_fields=False) + return api_response + + def _delete_operation( + self, + request: operations_pb2.DeleteOperationRequest, + *, + retry: OptionalRetry = gapic_v1.method.DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> empty_pb2.Empty: + r"""Call the delete operation method over HTTP. + + Args: + request (~.operations_pb2.DeleteOperationRequest): + The request object. The request message for + [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. + + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + """ + + http_options = [ + {"method": "delete", "uri": "/v1/{name=operations/**}"}, + ] + if "google.longrunning.Operations.DeleteOperation" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.DeleteOperation" + ] + + request_kwargs = json_format.MessageToDict( + request, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.DeleteOperationRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + including_default_value_fields=False, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + response = getattr(self._session, method)( + "https://{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + ) + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + raise core_exceptions.from_http_response(response) + + return empty_pb2.Empty() + + def _cancel_operation( + self, + request: operations_pb2.CancelOperationRequest, + *, + retry: OptionalRetry = gapic_v1.method.DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> empty_pb2.Empty: + r"""Call the cancel operation method over HTTP. + + Args: + request (~.operations_pb2.CancelOperationRequest): + The request object. The request message for + [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. + + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + """ + + http_options = [ + {"method": "post", "uri": "/v1/{name=operations/**}:cancel", "body": "*"}, + ] + if "google.longrunning.Operations.CancelOperation" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.CancelOperation" + ] + + request_kwargs = json_format.MessageToDict( + request, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + # Jsonify the request body + body_request = operations_pb2.CancelOperationRequest() + json_format.ParseDict(transcoded_request["body"], body_request) + body = json_format.MessageToDict( + body_request, + including_default_value_fields=False, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.CancelOperationRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + including_default_value_fields=False, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + response = getattr(self._session, method)( + "https://{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + data=body, + ) + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + raise core_exceptions.from_http_response(response) + + return empty_pb2.Empty() + + @property + def list_operations( + self, + ) -> Callable[ + [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse + ]: + return self._list_operations + + @property + def get_operation( + self, + ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: + return self._get_operation + + @property + def delete_operation( + self, + ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]: + return self._delete_operation + + @property + def cancel_operation( + self, + ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]: + return self._cancel_operation + + +__all__ = ("OperationsRestTransport",) |