diff options
author | juerg <juerg@google.com> | 2023-03-27 02:53:28 -0700 |
---|---|---|
committer | Copybara-Service <copybara-worker@google.com> | 2023-03-27 02:54:42 -0700 |
commit | 8b7b3a759882ed067ce1e0d90ba1f2baad78d9dc (patch) | |
tree | df3b579cd9427e96a4b56d5fc4d0aed6eebbdad4 /python | |
parent | 8d010213d6bf885cbcb42b615a204543de2881b0 (diff) | |
download | tink-8b7b3a759882ed067ce1e0d90ba1f2baad78d9dc.tar.gz |
Refactor tests for AWS KMS integration in Python.
Make them similar to the GCP KMS integration tests.
Also, move the test of the register function into the integration tests,
and check that register really worked.
PiperOrigin-RevId: 519666171
Diffstat (limited to 'python')
-rw-r--r-- | python/tink/integration/awskms/BUILD.bazel | 3 | ||||
-rw-r--r-- | python/tink/integration/awskms/_aws_kms_client_test.py | 38 | ||||
-rw-r--r-- | python/tink/integration/awskms/_aws_kms_integration_test.py | 67 |
3 files changed, 76 insertions, 32 deletions
diff --git a/python/tink/integration/awskms/BUILD.bazel b/python/tink/integration/awskms/BUILD.bazel index 8eb8b8186..651315741 100644 --- a/python/tink/integration/awskms/BUILD.bazel +++ b/python/tink/integration/awskms/BUILD.bazel @@ -35,6 +35,7 @@ py_test( srcs_version = "PY3", deps = [ ":awskms", + "//tink:tink_python", "//tink/testing:helper", requirement("absl-py"), ], @@ -52,6 +53,8 @@ py_test( tags = ["manual"], deps = [ ":awskms", + "//tink:tink_python", + "//tink/aead", "//tink/testing:helper", requirement("absl-py"), ], diff --git a/python/tink/integration/awskms/_aws_kms_client_test.py b/python/tink/integration/awskms/_aws_kms_client_test.py index 24861bb9b..5a0901b8f 100644 --- a/python/tink/integration/awskms/_aws_kms_client_test.py +++ b/python/tink/integration/awskms/_aws_kms_client_test.py @@ -17,37 +17,47 @@ import os from absl.testing import absltest -from tink import core +import tink from tink.integration import awskms from tink.testing import helper CREDENTIAL_PATH = os.path.join(helper.tink_py_testdata_path(), 'aws/credentials.ini') -KEY_URI = 'aws-kms://arn:aws:kms:us-east-2:235739564943:key/3ee50705-5a82-4f5b-9753-05c4f473922f' -BAD_KEY_URI = 'gcp-kms://projects/tink-test-infrastructure/locations/global/keyRings/unit-and-integration-testing/cryptoKeys/aead-key' +KEY_URI = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/' + '3ee50705-5a82-4f5b-9753-05c4f473922f') +KEY_URI_2 = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/' + 'b3ca2efd-a8fb-47f2-b541-7e20f8c5cd11') +GCP_KEY_URI = ('gcp-kms://projects/tink-test-infrastructure/locations/global/' + 'keyRings/unit-and-integration-testing/cryptoKeys/aead-key') class AwsKmsClientTest(absltest.TestCase): - def test_client_generation(self): + def test_client_bound_to_key_uri(self): aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH) - self.assertNotEqual(aws_client, None) - def test_wrong_key_uri(self): - with self.assertRaises(core.TinkError): - awskms.AwsKmsClient(BAD_KEY_URI, CREDENTIAL_PATH) + self.assertEqual(aws_client.does_support(KEY_URI), True) + self.assertEqual(aws_client.does_support(KEY_URI_2), False) + self.assertEqual(aws_client.does_support(GCP_KEY_URI), False) - def test_client_registration(self): + def test_client_not_bound_to_key_uri(self): aws_client = awskms.AwsKmsClient('', CREDENTIAL_PATH) - aws_client.register_client('', CREDENTIAL_PATH) - def test_client_not_bound(self): - gcp_key1 = 'gcp-kms://projects/someProject/.../cryptoKeys/key1' + self.assertEqual(aws_client.does_support(KEY_URI), True) + self.assertEqual(aws_client.does_support(KEY_URI_2), True) + self.assertEqual(aws_client.does_support(GCP_KEY_URI), False) - aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH) + def test_wrong_key_uri(self): + with self.assertRaises(tink.TinkError): + awskms.AwsKmsClient(GCP_KEY_URI, CREDENTIAL_PATH) + def test_client_empty_key_uri(self): + aws_client = awskms.AwsKmsClient('', CREDENTIAL_PATH) self.assertEqual(aws_client.does_support(KEY_URI), True) - self.assertEqual(aws_client.does_support(gcp_key1), False) + + def test_client_invalid_path(self): + with self.assertRaises(ValueError): + awskms.AwsKmsClient('', CREDENTIAL_PATH + 'corrupted') def test_wrong_credentials_path(self): with self.assertRaises(ValueError): diff --git a/python/tink/integration/awskms/_aws_kms_integration_test.py b/python/tink/integration/awskms/_aws_kms_integration_test.py index c1c369cf6..687f1989d 100644 --- a/python/tink/integration/awskms/_aws_kms_integration_test.py +++ b/python/tink/integration/awskms/_aws_kms_integration_test.py @@ -17,7 +17,8 @@ import os from absl.testing import absltest -from tink import core +import tink +from tink import aead from tink.integration import awskms from tink.testing import helper @@ -25,32 +26,40 @@ CREDENTIAL_PATH = os.path.join(helper.tink_py_testdata_path(), 'aws/credentials.ini') BAD_CREDENTIALS_PATH = os.path.join(helper.tink_py_testdata_path(), 'aws/credentials_bad.ini') -KEY_URI = 'aws-kms://arn:aws:kms:us-east-2:235739564943:key/3ee50705-5a82-4f5b-9753-05c4f473922f' -BAD_KEY_URI = 'gcp-kms://projects/tink-test-infrastructure/locations/global/keyRings/unit-and-integration-testing/cryptoKeys/aead-key' +KEY_URI = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/' + '3ee50705-5a82-4f5b-9753-05c4f473922f') +KEY_URI_2 = ('aws-kms://arn:aws:kms:us-east-2:235739564943:key/' + 'b3ca2efd-a8fb-47f2-b541-7e20f8c5cd11') +GCP_KEY_URI = ('gcp-kms://projects/tink-test-infrastructure/locations/global/' + 'keyRings/unit-and-integration-testing/cryptoKeys/aead-key') + + +def setUpModule(): + aead.register() class AwsKmsAeadTest(absltest.TestCase): def test_encrypt_decrypt(self): aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH) - aead = aws_client.get_aead(KEY_URI) + aws_aead = aws_client.get_aead(KEY_URI) plaintext = b'hello' associated_data = b'world' - ciphertext = aead.encrypt(plaintext, associated_data) - self.assertEqual(plaintext, aead.decrypt(ciphertext, associated_data)) + ciphertext = aws_aead.encrypt(plaintext, associated_data) + self.assertEqual(plaintext, aws_aead.decrypt(ciphertext, associated_data)) plaintext = b'hello' - ciphertext = aead.encrypt(plaintext, b'') - self.assertEqual(plaintext, aead.decrypt(ciphertext, b'')) + ciphertext = aws_aead.encrypt(plaintext, b'') + self.assertEqual(plaintext, aws_aead.decrypt(ciphertext, b'')) def test_corrupted_ciphertext(self): aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH) - aead = aws_client.get_aead(KEY_URI) + aws_aead = aws_client.get_aead(KEY_URI) plaintext = b'helloworld' - ciphertext = aead.encrypt(plaintext, b'') - self.assertEqual(plaintext, aead.decrypt(ciphertext, b'')) + ciphertext = aws_aead.encrypt(plaintext, b'') + self.assertEqual(plaintext, aws_aead.decrypt(ciphertext, b'')) # Corrupt each byte once and check that decryption fails # NOTE: Skipping two bytes as they are malleable @@ -58,22 +67,44 @@ class AwsKmsAeadTest(absltest.TestCase): tmp_ciphertext = list(ciphertext) tmp_ciphertext[byte_idx] ^= 1 corrupted_ciphertext = bytes(tmp_ciphertext) - with self.assertRaises(core.TinkError): - aead.decrypt(corrupted_ciphertext, b'') + with self.assertRaises(tink.TinkError): + aws_aead.decrypt(corrupted_ciphertext, b'') def test_encrypt_with_bad_uri(self): - with self.assertRaises(core.TinkError): + with self.assertRaises(tink.TinkError): aws_client = awskms.AwsKmsClient(KEY_URI, CREDENTIAL_PATH) - aws_client.get_aead(BAD_KEY_URI) + aws_client.get_aead(GCP_KEY_URI) def test_encrypt_with_bad_credentials(self): aws_client = awskms.AwsKmsClient(KEY_URI, BAD_CREDENTIALS_PATH) - aead = aws_client.get_aead(KEY_URI) + aws_aead = aws_client.get_aead(KEY_URI) plaintext = b'hello' associated_data = b'world' - with self.assertRaises(core.TinkError): - aead.encrypt(plaintext, associated_data) + with self.assertRaises(tink.TinkError): + aws_aead.encrypt(plaintext, associated_data) + + def test_client_registration(self): + # Register AWS KMS Client bound to KEY_URI. + awskms.AwsKmsClient.register_client(KEY_URI, CREDENTIAL_PATH) + + # Create a keyset handle for KEY_URI and use it. + handle = tink.new_keyset_handle( + aead.aead_key_templates.create_kms_aead_key_template(KEY_URI) + ) + aws_aead = handle.primitive(aead.Aead) + ciphertext = aws_aead.encrypt(b'plaintext', b'associated_data') + self.assertEqual( + b'plaintext', aws_aead.decrypt(ciphertext, b'associated_data') + ) + + # It fails for any other key URI. + with self.assertRaises(tink.TinkError): + handle2 = tink.new_keyset_handle( + aead.aead_key_templates.create_kms_aead_key_template(KEY_URI_2) + ) + gcp_aead = handle2.primitive(aead.Aead) + gcp_aead.encrypt(b'plaintext', b'associated_data') if __name__ == '__main__': |