aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorjuerg <juerg@google.com>2023-03-27 02:53:28 -0700
committerCopybara-Service <copybara-worker@google.com>2023-03-27 02:54:42 -0700
commit8b7b3a759882ed067ce1e0d90ba1f2baad78d9dc (patch)
treedf3b579cd9427e96a4b56d5fc4d0aed6eebbdad4 /python
parent8d010213d6bf885cbcb42b615a204543de2881b0 (diff)
downloadtink-8b7b3a759882ed067ce1e0d90ba1f2baad78d9dc.tar.gz
Refactor tests for AWS KMS integration in Python.
Make them similar to the GCP KMS integration tests. Also, move the test of the register function into the integration tests, and check that register really worked. PiperOrigin-RevId: 519666171
Diffstat (limited to 'python')
-rw-r--r--python/tink/integration/awskms/BUILD.bazel3
-rw-r--r--python/tink/integration/awskms/_aws_kms_client_test.py38
-rw-r--r--python/tink/integration/awskms/_aws_kms_integration_test.py67
3 files changed, 76 insertions, 32 deletions
diff --git a/python/tink/integration/awskms/BUILD.bazel b/python/tink/integration/awskms/BUILD.bazel
index 8eb8b8186..651315741 100644
--- a/python/tink/integration/awskms/BUILD.bazel
+++ b/python/tink/integration/awskms/BUILD.bazel
@@ -35,6 +35,7 @@ py_test(
srcs_version = "PY3",
deps = [
":awskms",
+ "//tink:tink_python",
"//tink/testing:helper",
requirement("absl-py"),
],
@@ -52,6 +53,8 @@ py_test(
tags = ["manual"],
deps = [
":awskms",
+ "//tink:tink_python",
+ "//tink/aead",
"//tink/testing:helper",
requirement("absl-py"),
],
diff --git a/python/tink/integration/awskms/_aws_kms_client_test.py b/python/tink/integration/awskms/_aws_kms_client_test.py
index 24861bb9b..5a0901b8f 100644
--- a/python/tink/integration/awskms/_aws_kms_client_test.py
+++ b/python/tink/integration/awskms/_aws_kms_client_test.py
@@ -17,37 +17,47 @@ import os
from absl.testing import absltest
-from tink import core
+import tink
from tink.integration import awskms
from tink.testing import helper
CREDENTIAL_PATH = os.path.join(helper.tink_py_testdata_path(),
'aws/credentials.ini')
-KEY_URI = 'aws-kms://arn:aws:kms:us-east-2:235739564943:key/3ee50705-5a82-4f5b-9753-05c4f473922f'
-BAD_KEY_URI = 'gcp-kms://projects/tink-test-infrastructure/locations/global/keyRings/unit-and-integration-testing/cryptoKeys/aead-key'
+KEY_URI = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/'
+ '3ee50705-5a82-4f5b-9753-05c4f473922f')
+KEY_URI_2 = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/'
+ 'b3ca2efd-a8fb-47f2-b541-7e20f8c5cd11')
+GCP_KEY_URI = ('gcp-kms://projects/tink-test-infrastructure/locations/global/'
+ 'keyRings/unit-and-integration-testing/cryptoKeys/aead-key')
class AwsKmsClientTest(absltest.TestCase):
- def test_client_generation(self):
+ def test_client_bound_to_key_uri(self):
aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH)
- self.assertNotEqual(aws_client, None)
- def test_wrong_key_uri(self):
- with self.assertRaises(core.TinkError):
- awskms.AwsKmsClient(BAD_KEY_URI, CREDENTIAL_PATH)
+ self.assertEqual(aws_client.does_support(KEY_URI), True)
+ self.assertEqual(aws_client.does_support(KEY_URI_2), False)
+ self.assertEqual(aws_client.does_support(GCP_KEY_URI), False)
- def test_client_registration(self):
+ def test_client_not_bound_to_key_uri(self):
aws_client = awskms.AwsKmsClient('', CREDENTIAL_PATH)
- aws_client.register_client('', CREDENTIAL_PATH)
- def test_client_not_bound(self):
- gcp_key1 = 'gcp-kms://projects/someProject/.../cryptoKeys/key1'
+ self.assertEqual(aws_client.does_support(KEY_URI), True)
+ self.assertEqual(aws_client.does_support(KEY_URI_2), True)
+ self.assertEqual(aws_client.does_support(GCP_KEY_URI), False)
- aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH)
+ def test_wrong_key_uri(self):
+ with self.assertRaises(tink.TinkError):
+ awskms.AwsKmsClient(GCP_KEY_URI, CREDENTIAL_PATH)
+ def test_client_empty_key_uri(self):
+ aws_client = awskms.AwsKmsClient('', CREDENTIAL_PATH)
self.assertEqual(aws_client.does_support(KEY_URI), True)
- self.assertEqual(aws_client.does_support(gcp_key1), False)
+
+ def test_client_invalid_path(self):
+ with self.assertRaises(ValueError):
+ awskms.AwsKmsClient('', CREDENTIAL_PATH + 'corrupted')
def test_wrong_credentials_path(self):
with self.assertRaises(ValueError):
diff --git a/python/tink/integration/awskms/_aws_kms_integration_test.py b/python/tink/integration/awskms/_aws_kms_integration_test.py
index c1c369cf6..687f1989d 100644
--- a/python/tink/integration/awskms/_aws_kms_integration_test.py
+++ b/python/tink/integration/awskms/_aws_kms_integration_test.py
@@ -17,7 +17,8 @@ import os
from absl.testing import absltest
-from tink import core
+import tink
+from tink import aead
from tink.integration import awskms
from tink.testing import helper
@@ -25,32 +26,40 @@ CREDENTIAL_PATH = os.path.join(helper.tink_py_testdata_path(),
'aws/credentials.ini')
BAD_CREDENTIALS_PATH = os.path.join(helper.tink_py_testdata_path(),
'aws/credentials_bad.ini')
-KEY_URI = 'aws-kms://arn:aws:kms:us-east-2:235739564943:key/3ee50705-5a82-4f5b-9753-05c4f473922f'
-BAD_KEY_URI = 'gcp-kms://projects/tink-test-infrastructure/locations/global/keyRings/unit-and-integration-testing/cryptoKeys/aead-key'
+KEY_URI = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/'
+ '3ee50705-5a82-4f5b-9753-05c4f473922f')
+KEY_URI_2 = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/'
+ 'b3ca2efd-a8fb-47f2-b541-7e20f8c5cd11')
+GCP_KEY_URI = ('gcp-kms://projects/tink-test-infrastructure/locations/global/'
+ 'keyRings/unit-and-integration-testing/cryptoKeys/aead-key')
+
+
+def setUpModule():
+ aead.register()
class AwsKmsAeadTest(absltest.TestCase):
def test_encrypt_decrypt(self):
aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH)
- aead = aws_client.get_aead(KEY_URI)
+ aws_aead = aws_client.get_aead(KEY_URI)
plaintext = b'hello'
associated_data = b'world'
- ciphertext = aead.encrypt(plaintext, associated_data)
- self.assertEqual(plaintext, aead.decrypt(ciphertext, associated_data))
+ ciphertext = aws_aead.encrypt(plaintext, associated_data)
+ self.assertEqual(plaintext, aws_aead.decrypt(ciphertext, associated_data))
plaintext = b'hello'
- ciphertext = aead.encrypt(plaintext, b'')
- self.assertEqual(plaintext, aead.decrypt(ciphertext, b''))
+ ciphertext = aws_aead.encrypt(plaintext, b'')
+ self.assertEqual(plaintext, aws_aead.decrypt(ciphertext, b''))
def test_corrupted_ciphertext(self):
aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH)
- aead = aws_client.get_aead(KEY_URI)
+ aws_aead = aws_client.get_aead(KEY_URI)
plaintext = b'helloworld'
- ciphertext = aead.encrypt(plaintext, b'')
- self.assertEqual(plaintext, aead.decrypt(ciphertext, b''))
+ ciphertext = aws_aead.encrypt(plaintext, b'')
+ self.assertEqual(plaintext, aws_aead.decrypt(ciphertext, b''))
# Corrupt each byte once and check that decryption fails
# NOTE: Skipping two bytes as they are malleable
@@ -58,22 +67,44 @@ class AwsKmsAeadTest(absltest.TestCase):
tmp_ciphertext = list(ciphertext)
tmp_ciphertext[byte_idx] ^= 1
corrupted_ciphertext = bytes(tmp_ciphertext)
- with self.assertRaises(core.TinkError):
- aead.decrypt(corrupted_ciphertext, b'')
+ with self.assertRaises(tink.TinkError):
+ aws_aead.decrypt(corrupted_ciphertext, b'')
def test_encrypt_with_bad_uri(self):
- with self.assertRaises(core.TinkError):
+ with self.assertRaises(tink.TinkError):
aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH)
- aws_client.get_aead(BAD_KEY_URI)
+ aws_client.get_aead(GCP_KEY_URI)
def test_encrypt_with_bad_credentials(self):
aws_client = awskms.AwsKmsClient(KEY_URI, BAD_CREDENTIALS_PATH)
- aead = aws_client.get_aead(KEY_URI)
+ aws_aead = aws_client.get_aead(KEY_URI)
plaintext = b'hello'
associated_data = b'world'
- with self.assertRaises(core.TinkError):
- aead.encrypt(plaintext, associated_data)
+ with self.assertRaises(tink.TinkError):
+ aws_aead.encrypt(plaintext, associated_data)
+
+ def test_client_registration(self):
+ # Register AWS KMS Client bound to KEY_URI.
+ awskms.AwsKmsClient.register_client(KEY_URI, CREDENTIAL_PATH)
+
+ # Create a keyset handle for KEY_URI and use it.
+ handle = tink.new_keyset_handle(
+ aead.aead_key_templates.create_kms_aead_key_template(KEY_URI)
+ )
+ aws_aead = handle.primitive(aead.Aead)
+ ciphertext = aws_aead.encrypt(b'plaintext', b'associated_data')
+ self.assertEqual(
+ b'plaintext', aws_aead.decrypt(ciphertext, b'associated_data')
+ )
+
+ # It fails for any other key URI.
+ with self.assertRaises(tink.TinkError):
+ handle2 = tink.new_keyset_handle(
+ aead.aead_key_templates.create_kms_aead_key_template(KEY_URI_2)
+ )
+ gcp_aead = handle2.primitive(aead.Aead)
+ gcp_aead.encrypt(b'plaintext', b'associated_data')
if __name__ == '__main__':