# Copyright 2017, Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import mock from google.api_core import exceptions from google.api_core import operation from google.api_core import operations_v1 from google.api_core import retry from google.longrunning import operations_pb2 from google.protobuf import struct_pb2 from google.rpc import code_pb2 from google.rpc import status_pb2 TEST_OPERATION_NAME = "test/operation" def make_operation_proto( name=TEST_OPERATION_NAME, metadata=None, response=None, error=None, **kwargs ): operation_proto = operations_pb2.Operation(name=name, **kwargs) if metadata is not None: operation_proto.metadata.Pack(metadata) if response is not None: operation_proto.response.Pack(response) if error is not None: operation_proto.error.CopyFrom(error) return operation_proto def make_operation_future(client_operations_responses=None): if client_operations_responses is None: client_operations_responses = [make_operation_proto()] refresh = mock.Mock(spec=["__call__"], side_effect=client_operations_responses) refresh.responses = client_operations_responses cancel = mock.Mock(spec=["__call__"]) operation_future = operation.Operation( client_operations_responses[0], refresh, cancel, result_type=struct_pb2.Struct, metadata_type=struct_pb2.Struct, ) return operation_future, refresh, cancel def test_constructor(): future, refresh, _ = make_operation_future() assert future.operation == refresh.responses[0] assert future.operation.done is False assert future.operation.name == TEST_OPERATION_NAME assert future.metadata is None assert future.running() def test_metadata(): expected_metadata = struct_pb2.Struct() future, _, _ = make_operation_future( [make_operation_proto(metadata=expected_metadata)] ) assert future.metadata == expected_metadata def test_cancellation(): responses = [ make_operation_proto(), # Second response indicates that the operation was cancelled. make_operation_proto( done=True, error=status_pb2.Status(code=code_pb2.CANCELLED) ), ] future, _, cancel = make_operation_future(responses) assert future.cancel() assert future.cancelled() cancel.assert_called_once_with() # Cancelling twice should have no effect. assert not future.cancel() cancel.assert_called_once_with() def test_result(): expected_result = struct_pb2.Struct() responses = [ make_operation_proto(), # Second operation response includes the result. make_operation_proto(done=True, response=expected_result), ] future, _, _ = make_operation_future(responses) result = future.result() assert result == expected_result assert future.done() def test_done_w_retry(): RETRY_PREDICATE = retry.if_exception_type(exceptions.TooManyRequests) test_retry = retry.Retry(predicate=RETRY_PREDICATE) expected_result = struct_pb2.Struct() responses = [ make_operation_proto(), # Second operation response includes the result. make_operation_proto(done=True, response=expected_result), ] future, _, _ = make_operation_future(responses) future._refresh = mock.Mock() future.done(retry=test_retry) future._refresh.assert_called_once_with(retry=test_retry) def test_exception(): expected_exception = status_pb2.Status(message="meep") responses = [ make_operation_proto(), # Second operation response includes the error. make_operation_proto(done=True, error=expected_exception), ] future, _, _ = make_operation_future(responses) exception = future.exception() assert expected_exception.message in "{!r}".format(exception) def test_exception_with_error_code(): expected_exception = status_pb2.Status(message="meep", code=5) responses = [ make_operation_proto(), # Second operation response includes the error. make_operation_proto(done=True, error=expected_exception), ] future, _, _ = make_operation_future(responses) exception = future.exception() assert expected_exception.message in "{!r}".format(exception) # Status Code 5 maps to Not Found # https://developers.google.com/maps-booking/reference/grpc-api/status_codes assert isinstance(exception, exceptions.NotFound) def test_unexpected_result(): responses = [ make_operation_proto(), # Second operation response is done, but has not error or response. make_operation_proto(done=True), ] future, _, _ = make_operation_future(responses) exception = future.exception() assert "Unexpected state" in "{!r}".format(exception) def test__refresh_http(): json_response = {"name": TEST_OPERATION_NAME, "done": True} api_request = mock.Mock(return_value=json_response) result = operation._refresh_http(api_request, TEST_OPERATION_NAME) assert isinstance(result, operations_pb2.Operation) assert result.name == TEST_OPERATION_NAME assert result.done is True api_request.assert_called_once_with( method="GET", path="operations/{}".format(TEST_OPERATION_NAME) ) def test__refresh_http_w_retry(): json_response = {"name": TEST_OPERATION_NAME, "done": True} api_request = mock.Mock() retry = mock.Mock() retry.return_value.return_value = json_response result = operation._refresh_http(api_request, TEST_OPERATION_NAME, retry=retry) assert isinstance(result, operations_pb2.Operation) assert result.name == TEST_OPERATION_NAME assert result.done is True api_request.assert_not_called() retry.assert_called_once_with(api_request) retry.return_value.assert_called_once_with( method="GET", path="operations/{}".format(TEST_OPERATION_NAME) ) def test__cancel_http(): api_request = mock.Mock() operation._cancel_http(api_request, TEST_OPERATION_NAME) api_request.assert_called_once_with( method="POST", path="operations/{}:cancel".format(TEST_OPERATION_NAME) ) def test_from_http_json(): operation_json = {"name": TEST_OPERATION_NAME, "done": True} api_request = mock.sentinel.api_request future = operation.from_http_json( operation_json, api_request, struct_pb2.Struct, metadata_type=struct_pb2.Struct ) assert future._result_type == struct_pb2.Struct assert future._metadata_type == struct_pb2.Struct assert future.operation.name == TEST_OPERATION_NAME assert future.done def test__refresh_grpc(): operations_stub = mock.Mock(spec=["GetOperation"]) expected_result = make_operation_proto(done=True) operations_stub.GetOperation.return_value = expected_result result = operation._refresh_grpc(operations_stub, TEST_OPERATION_NAME) assert result == expected_result expected_request = operations_pb2.GetOperationRequest(name=TEST_OPERATION_NAME) operations_stub.GetOperation.assert_called_once_with(expected_request) def test__refresh_grpc_w_retry(): operations_stub = mock.Mock(spec=["GetOperation"]) expected_result = make_operation_proto(done=True) retry = mock.Mock() retry.return_value.return_value = expected_result result = operation._refresh_grpc(operations_stub, TEST_OPERATION_NAME, retry=retry) assert result == expected_result expected_request = operations_pb2.GetOperationRequest(name=TEST_OPERATION_NAME) operations_stub.GetOperation.assert_not_called() retry.assert_called_once_with(operations_stub.GetOperation) retry.return_value.assert_called_once_with(expected_request) def test__cancel_grpc(): operations_stub = mock.Mock(spec=["CancelOperation"]) operation._cancel_grpc(operations_stub, TEST_OPERATION_NAME) expected_request = operations_pb2.CancelOperationRequest(name=TEST_OPERATION_NAME) operations_stub.CancelOperation.assert_called_once_with(expected_request) def test_from_grpc(): operation_proto = make_operation_proto(done=True) operations_stub = mock.sentinel.operations_stub future = operation.from_grpc( operation_proto, operations_stub, struct_pb2.Struct, metadata_type=struct_pb2.Struct, grpc_metadata=[("x-goog-request-params", "foo")], ) assert future._result_type == struct_pb2.Struct assert future._metadata_type == struct_pb2.Struct assert future.operation.name == TEST_OPERATION_NAME assert future.done assert future._refresh.keywords["metadata"] == [("x-goog-request-params", "foo")] assert future._cancel.keywords["metadata"] == [("x-goog-request-params", "foo")] def test_from_gapic(): operation_proto = make_operation_proto(done=True) operations_client = mock.create_autospec( operations_v1.OperationsClient, instance=True ) future = operation.from_gapic( operation_proto, operations_client, struct_pb2.Struct, metadata_type=struct_pb2.Struct, grpc_metadata=[("x-goog-request-params", "foo")], ) assert future._result_type == struct_pb2.Struct assert future._metadata_type == struct_pb2.Struct assert future.operation.name == TEST_OPERATION_NAME assert future.done assert future._refresh.keywords["metadata"] == [("x-goog-request-params", "foo")] assert future._cancel.keywords["metadata"] == [("x-goog-request-params", "foo")] def test_deserialize(): op = make_operation_proto(name="foobarbaz") serialized = op.SerializeToString() deserialized_op = operation.Operation.deserialize(serialized) assert op.name == deserialized_op.name assert type(op) is type(deserialized_op)