aboutsummaryrefslogtreecommitdiff
path: root/google/auth/transport/_mtls_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'google/auth/transport/_mtls_helper.py')
-rw-r--r--google/auth/transport/_mtls_helper.py254
1 files changed, 254 insertions, 0 deletions
diff --git a/google/auth/transport/_mtls_helper.py b/google/auth/transport/_mtls_helper.py
new file mode 100644
index 0000000..4dccb10
--- /dev/null
+++ b/google/auth/transport/_mtls_helper.py
@@ -0,0 +1,254 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Helper functions for getting mTLS cert and key."""
+
+import json
+import logging
+from os import path
+import re
+import subprocess
+
+import six
+
+from google.auth import exceptions
+
+CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
+_CERT_PROVIDER_COMMAND = "cert_provider_command"
+_CERT_REGEX = re.compile(
+ b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
+)
+
+# support various format of key files, e.g.
+# "-----BEGIN PRIVATE KEY-----...",
+# "-----BEGIN EC PRIVATE KEY-----...",
+# "-----BEGIN RSA PRIVATE KEY-----..."
+# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
+_KEY_REGEX = re.compile(
+ b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
+ re.DOTALL,
+)
+
+_LOGGER = logging.getLogger(__name__)
+
+
+_PASSPHRASE_REGEX = re.compile(
+ b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
+)
+
+
+def _check_dca_metadata_path(metadata_path):
+ """Checks for context aware metadata. If it exists, returns the absolute path;
+ otherwise returns None.
+
+ Args:
+ metadata_path (str): context aware metadata path.
+
+ Returns:
+ str: absolute path if exists and None otherwise.
+ """
+ metadata_path = path.expanduser(metadata_path)
+ if not path.exists(metadata_path):
+ _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path)
+ return None
+ return metadata_path
+
+
+def _read_dca_metadata_file(metadata_path):
+ """Loads context aware metadata from the given path.
+
+ Args:
+ metadata_path (str): context aware metadata path.
+
+ Returns:
+ Dict[str, str]: The metadata.
+
+ Raises:
+ google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON.
+ """
+ try:
+ with open(metadata_path) as f:
+ metadata = json.load(f)
+ except ValueError as caught_exc:
+ new_exc = exceptions.ClientCertError(caught_exc)
+ six.raise_from(new_exc, caught_exc)
+
+ return metadata
+
+
+def _run_cert_provider_command(command, expect_encrypted_key=False):
+ """Run the provided command, and return client side mTLS cert, key and
+ passphrase.
+
+ Args:
+ command (List[str]): cert provider command.
+ expect_encrypted_key (bool): If encrypted private key is expected.
+
+ Returns:
+ Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
+ bytes in PEM format and passphrase bytes.
+
+ Raises:
+ google.auth.exceptions.ClientCertError: if problems occurs when running
+ the cert provider command or generating cert, key and passphrase.
+ """
+ try:
+ process = subprocess.Popen(
+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
+ stdout, stderr = process.communicate()
+ except OSError as caught_exc:
+ new_exc = exceptions.ClientCertError(caught_exc)
+ six.raise_from(new_exc, caught_exc)
+
+ # Check cert provider command execution error.
+ if process.returncode != 0:
+ raise exceptions.ClientCertError(
+ "Cert provider command returns non-zero status code %s" % process.returncode
+ )
+
+ # Extract certificate (chain), key and passphrase.
+ cert_match = re.findall(_CERT_REGEX, stdout)
+ if len(cert_match) != 1:
+ raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
+ key_match = re.findall(_KEY_REGEX, stdout)
+ if len(key_match) != 1:
+ raise exceptions.ClientCertError("Client SSL key is missing or invalid")
+ passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
+
+ if expect_encrypted_key:
+ if len(passphrase_match) != 1:
+ raise exceptions.ClientCertError("Passphrase is missing or invalid")
+ if b"ENCRYPTED" not in key_match[0]:
+ raise exceptions.ClientCertError("Encrypted private key is expected")
+ return cert_match[0], key_match[0], passphrase_match[0].strip()
+
+ if b"ENCRYPTED" in key_match[0]:
+ raise exceptions.ClientCertError("Encrypted private key is not expected")
+ if len(passphrase_match) > 0:
+ raise exceptions.ClientCertError("Passphrase is not expected")
+ return cert_match[0], key_match[0], None
+
+
+def get_client_ssl_credentials(
+ generate_encrypted_key=False,
+ context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
+):
+ """Returns the client side certificate, private key and passphrase.
+
+ Args:
+ generate_encrypted_key (bool): If set to True, encrypted private key
+ and passphrase will be generated; otherwise, unencrypted private key
+ will be generated and passphrase will be None.
+ context_aware_metadata_path (str): The context_aware_metadata.json file path.
+
+ Returns:
+ Tuple[bool, bytes, bytes, bytes]:
+ A boolean indicating if cert, key and passphrase are obtained, the
+ cert bytes and key bytes both in PEM format, and passphrase bytes.
+
+ Raises:
+ google.auth.exceptions.ClientCertError: if problems occurs when getting
+ the cert, key and passphrase.
+ """
+ metadata_path = _check_dca_metadata_path(context_aware_metadata_path)
+
+ if metadata_path:
+ metadata_json = _read_dca_metadata_file(metadata_path)
+
+ if _CERT_PROVIDER_COMMAND not in metadata_json:
+ raise exceptions.ClientCertError("Cert provider command is not found")
+
+ command = metadata_json[_CERT_PROVIDER_COMMAND]
+
+ if generate_encrypted_key and "--with_passphrase" not in command:
+ command.append("--with_passphrase")
+
+ # Execute the command.
+ cert, key, passphrase = _run_cert_provider_command(
+ command, expect_encrypted_key=generate_encrypted_key
+ )
+ return True, cert, key, passphrase
+
+ return False, None, None, None
+
+
+def get_client_cert_and_key(client_cert_callback=None):
+ """Returns the client side certificate and private key. The function first
+ tries to get certificate and key from client_cert_callback; if the callback
+ is None or doesn't provide certificate and key, the function tries application
+ default SSL credentials.
+
+ Args:
+ client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
+ optional callback which returns client certificate bytes and private
+ key bytes both in PEM format.
+
+ Returns:
+ Tuple[bool, bytes, bytes]:
+ A boolean indicating if cert and key are obtained, the cert bytes
+ and key bytes both in PEM format.
+
+ Raises:
+ google.auth.exceptions.ClientCertError: if problems occurs when getting
+ the cert and key.
+ """
+ if client_cert_callback:
+ cert, key = client_cert_callback()
+ return True, cert, key
+
+ has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
+ return has_cert, cert, key
+
+
+def decrypt_private_key(key, passphrase):
+ """A helper function to decrypt the private key with the given passphrase.
+ google-auth library doesn't support passphrase protected private key for
+ mutual TLS channel. This helper function can be used to decrypt the
+ passphrase protected private key in order to estalish mutual TLS channel.
+
+ For example, if you have a function which produces client cert, passphrase
+ protected private key and passphrase, you can convert it to a client cert
+ callback function accepted by google-auth::
+
+ from google.auth.transport import _mtls_helper
+
+ def your_client_cert_function():
+ return cert, encrypted_key, passphrase
+
+ # callback accepted by google-auth for mutual TLS channel.
+ def client_cert_callback():
+ cert, encrypted_key, passphrase = your_client_cert_function()
+ decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
+ passphrase)
+ return cert, decrypted_key
+
+ Args:
+ key (bytes): The private key bytes in PEM format.
+ passphrase (bytes): The passphrase bytes.
+
+ Returns:
+ bytes: The decrypted private key in PEM format.
+
+ Raises:
+ ImportError: If pyOpenSSL is not installed.
+ OpenSSL.crypto.Error: If there is any problem decrypting the private key.
+ """
+ from OpenSSL import crypto
+
+ # First convert encrypted_key_bytes to PKey object
+ pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
+
+ # Then dump the decrypted key bytes
+ return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)