aboutsummaryrefslogtreecommitdiff
path: root/tests/unit/test_operation.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/test_operation.py')
-rw-r--r--tests/unit/test_operation.py223
1 files changed, 223 insertions, 0 deletions
diff --git a/tests/unit/test_operation.py b/tests/unit/test_operation.py
new file mode 100644
index 0000000..1d765cc
--- /dev/null
+++ b/tests/unit/test_operation.py
@@ -0,0 +1,223 @@
+# Copyright 2017, Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import mock
+
+from google.api_core import operation
+from google.api_core import operations_v1
+from google.longrunning import operations_pb2
+from google.protobuf import struct_pb2
+from google.rpc import code_pb2
+from google.rpc import status_pb2
+
+TEST_OPERATION_NAME = 'test/operation'
+
+
+def make_operation_proto(
+ name=TEST_OPERATION_NAME, metadata=None, response=None,
+ error=None, **kwargs):
+ operation_proto = operations_pb2.Operation(
+ name=name, **kwargs)
+
+ if metadata is not None:
+ operation_proto.metadata.Pack(metadata)
+
+ if response is not None:
+ operation_proto.response.Pack(response)
+
+ if error is not None:
+ operation_proto.error.CopyFrom(error)
+
+ return operation_proto
+
+
+def make_operation_future(client_operations_responses=None):
+ if client_operations_responses is None:
+ client_operations_responses = [make_operation_proto()]
+
+ refresh = mock.Mock(
+ spec=['__call__'], side_effect=client_operations_responses)
+ refresh.responses = client_operations_responses
+ cancel = mock.Mock(spec=['__call__'])
+ operation_future = operation.Operation(
+ client_operations_responses[0],
+ refresh,
+ cancel,
+ result_type=struct_pb2.Struct,
+ metadata_type=struct_pb2.Struct)
+
+ return operation_future, refresh, cancel
+
+
+def test_constructor():
+ future, refresh, _ = make_operation_future()
+
+ assert future.operation == refresh.responses[0]
+ assert future.operation.done is False
+ assert future.operation.name == TEST_OPERATION_NAME
+ assert future.metadata is None
+ assert future.running()
+
+
+def test_metadata():
+ expected_metadata = struct_pb2.Struct()
+ future, _, _ = make_operation_future(
+ [make_operation_proto(metadata=expected_metadata)])
+
+ assert future.metadata == expected_metadata
+
+
+def test_cancellation():
+ responses = [
+ make_operation_proto(),
+ # Second response indicates that the operation was cancelled.
+ make_operation_proto(
+ done=True,
+ error=status_pb2.Status(code=code_pb2.CANCELLED))]
+ future, _, cancel = make_operation_future(responses)
+
+ assert future.cancel()
+ assert future.cancelled()
+ cancel.assert_called_once_with()
+
+ # Cancelling twice should have no effect.
+ assert not future.cancel()
+ cancel.assert_called_once_with()
+
+
+def test_result():
+ expected_result = struct_pb2.Struct()
+ responses = [
+ make_operation_proto(),
+ # Second operation response includes the result.
+ make_operation_proto(done=True, response=expected_result)]
+ future, _, _ = make_operation_future(responses)
+
+ result = future.result()
+
+ assert result == expected_result
+ assert future.done()
+
+
+def test_exception():
+ expected_exception = status_pb2.Status(message='meep')
+ responses = [
+ make_operation_proto(),
+ # Second operation response includes the error.
+ make_operation_proto(done=True, error=expected_exception)]
+ future, _, _ = make_operation_future(responses)
+
+ exception = future.exception()
+
+ assert expected_exception.message in '{!r}'.format(exception)
+
+
+def test_unexpected_result():
+ responses = [
+ make_operation_proto(),
+ # Second operation response is done, but has not error or response.
+ make_operation_proto(done=True)]
+ future, _, _ = make_operation_future(responses)
+
+ exception = future.exception()
+
+ assert 'Unexpected state' in '{!r}'.format(exception)
+
+
+def test__refresh_http():
+ api_request = mock.Mock(
+ return_value={'name': TEST_OPERATION_NAME, 'done': True})
+
+ result = operation._refresh_http(api_request, TEST_OPERATION_NAME)
+
+ assert result.name == TEST_OPERATION_NAME
+ assert result.done is True
+ api_request.assert_called_once_with(
+ method='GET', path='operations/{}'.format(TEST_OPERATION_NAME))
+
+
+def test__cancel_http():
+ api_request = mock.Mock()
+
+ operation._cancel_http(api_request, TEST_OPERATION_NAME)
+
+ api_request.assert_called_once_with(
+ method='POST', path='operations/{}:cancel'.format(TEST_OPERATION_NAME))
+
+
+def test_from_http_json():
+ operation_json = {'name': TEST_OPERATION_NAME, 'done': True}
+ api_request = mock.sentinel.api_request
+
+ future = operation.from_http_json(
+ operation_json, api_request, struct_pb2.Struct,
+ metadata_type=struct_pb2.Struct)
+
+ assert future._result_type == struct_pb2.Struct
+ assert future._metadata_type == struct_pb2.Struct
+ assert future.operation.name == TEST_OPERATION_NAME
+ assert future.done
+
+
+def test__refresh_grpc():
+ operations_stub = mock.Mock(spec=['GetOperation'])
+ expected_result = make_operation_proto(done=True)
+ operations_stub.GetOperation.return_value = expected_result
+
+ result = operation._refresh_grpc(operations_stub, TEST_OPERATION_NAME)
+
+ assert result == expected_result
+ expected_request = operations_pb2.GetOperationRequest(
+ name=TEST_OPERATION_NAME)
+ operations_stub.GetOperation.assert_called_once_with(expected_request)
+
+
+def test__cancel_grpc():
+ operations_stub = mock.Mock(spec=['CancelOperation'])
+
+ operation._cancel_grpc(operations_stub, TEST_OPERATION_NAME)
+
+ expected_request = operations_pb2.CancelOperationRequest(
+ name=TEST_OPERATION_NAME)
+ operations_stub.CancelOperation.assert_called_once_with(expected_request)
+
+
+def test_from_grpc():
+ operation_proto = make_operation_proto(done=True)
+ operations_stub = mock.sentinel.operations_stub
+
+ future = operation.from_grpc(
+ operation_proto, operations_stub, struct_pb2.Struct,
+ metadata_type=struct_pb2.Struct)
+
+ assert future._result_type == struct_pb2.Struct
+ assert future._metadata_type == struct_pb2.Struct
+ assert future.operation.name == TEST_OPERATION_NAME
+ assert future.done
+
+
+def test_from_gapic():
+ operation_proto = make_operation_proto(done=True)
+ operations_client = mock.create_autospec(
+ operations_v1.OperationsClient, instance=True)
+
+ future = operation.from_gapic(
+ operation_proto, operations_client, struct_pb2.Struct,
+ metadata_type=struct_pb2.Struct)
+
+ assert future._result_type == struct_pb2.Struct
+ assert future._metadata_type == struct_pb2.Struct
+ assert future.operation.name == TEST_OPERATION_NAME
+ assert future.done