aboutsummaryrefslogtreecommitdiff
path: root/catapult/telemetry/third_party/web-page-replay/certutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'catapult/telemetry/third_party/web-page-replay/certutils.py')
-rw-r--r--catapult/telemetry/third_party/web-page-replay/certutils.py288
1 files changed, 288 insertions, 0 deletions
diff --git a/catapult/telemetry/third_party/web-page-replay/certutils.py b/catapult/telemetry/third_party/web-page-replay/certutils.py
new file mode 100644
index 00000000..73bfdf4e
--- /dev/null
+++ b/catapult/telemetry/third_party/web-page-replay/certutils.py
@@ -0,0 +1,288 @@
+# Copyright 2014 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Routines to generate root and server certificates.
+
+Certificate Naming Conventions:
+ ca_cert: crypto.X509 for the certificate authority (w/ both the pub &
+ priv keys)
+ cert: a crypto.X509 certificate (w/ just the pub key)
+ cert_str: a certificate string (w/ just the pub cert)
+ key: a private crypto.PKey (from ca or pem)
+ ca_cert_str: a certificae authority string (w/ both the pub & priv certs)
+"""
+
+import logging
+import os
+import platform
+import socket
+import subprocess
+import time
+
+openssl_import_error = None
+
+Error = None
+SSL_METHOD = None
+SysCallError = None
+VERIFY_PEER = None
+ZeroReturnError = None
+FILETYPE_PEM = None
+
+try:
+ from OpenSSL import crypto, SSL
+
+ Error = SSL.Error
+ SSL_METHOD = SSL.SSLv23_METHOD
+ SysCallError = SSL.SysCallError
+ VERIFY_PEER = SSL.VERIFY_PEER
+ ZeroReturnError = SSL.ZeroReturnError
+ FILETYPE_PEM = crypto.FILETYPE_PEM
+except ImportError, e:
+ openssl_import_error = e
+
+
+def get_ssl_context(method=SSL_METHOD):
+ # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD
+ if openssl_import_error:
+ raise openssl_import_error # pylint: disable=raising-bad-type
+ return SSL.Context(method)
+
+
+class WrappedConnection(object):
+
+ def __init__(self, obj):
+ self._wrapped_obj = obj
+
+ def __getattr__(self, attr):
+ if attr in self.__dict__:
+ return getattr(self, attr)
+ return getattr(self._wrapped_obj, attr)
+
+ def recv(self, buflen=1024, flags=0):
+ try:
+ return self._wrapped_obj.recv(buflen, flags)
+ except SSL.SysCallError, e:
+ if e.args[1] == 'Unexpected EOF':
+ return ''
+ raise
+ except SSL.ZeroReturnError:
+ return ''
+
+
+def get_ssl_connection(context, connection):
+ return WrappedConnection(SSL.Connection(context, connection))
+
+
+def load_privatekey(key, filetype=FILETYPE_PEM):
+ """Loads obj private key object from string."""
+ return crypto.load_privatekey(filetype, key)
+
+
+def load_cert(cert_str, filetype=FILETYPE_PEM):
+ """Loads obj cert object from string."""
+ return crypto.load_certificate(filetype, cert_str)
+
+
+def _dump_privatekey(key, filetype=FILETYPE_PEM):
+ """Dumps obj private key object to string."""
+ return crypto.dump_privatekey(filetype, key)
+
+
+def _dump_cert(cert, filetype=FILETYPE_PEM):
+ """Dumps obj cert object to string."""
+ return crypto.dump_certificate(filetype, cert)
+
+
+def generate_dummy_ca_cert(subject='_WebPageReplayCert'):
+ """Generates dummy certificate authority.
+
+ Args:
+ subject: a string representing the desired root cert issuer
+ Returns:
+ A tuple of the public key and the private key strings for the root
+ certificate
+ """
+ if openssl_import_error:
+ raise openssl_import_error # pylint: disable=raising-bad-type
+
+ key = crypto.PKey()
+ key.generate_key(crypto.TYPE_RSA, 1024)
+
+ ca_cert = crypto.X509()
+ ca_cert.set_serial_number(int(time.time()*10000))
+ ca_cert.set_version(2)
+ ca_cert.get_subject().CN = subject
+ ca_cert.get_subject().O = subject
+ ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2)
+ ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2)
+ ca_cert.set_issuer(ca_cert.get_subject())
+ ca_cert.set_pubkey(key)
+ ca_cert.add_extensions([
+ crypto.X509Extension('basicConstraints', True, 'CA:TRUE'),
+ crypto.X509Extension('nsCertType', True, 'sslCA'),
+ crypto.X509Extension('extendedKeyUsage', True,
+ ('serverAuth,clientAuth,emailProtection,'
+ 'timeStamping,msCodeInd,msCodeCom,msCTLSign,'
+ 'msSGC,msEFS,nsSGC')),
+ crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'),
+ crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
+ subject=ca_cert),
+ ])
+ ca_cert.sign(key, 'sha256')
+ key_str = _dump_privatekey(key)
+ ca_cert_str = _dump_cert(ca_cert)
+ return ca_cert_str, key_str
+
+
+def get_host_cert(host, port=443):
+ """Contacts the host and returns its certificate."""
+ host_certs = []
+ def verify_cb(conn, cert, errnum, depth, ok):
+ host_certs.append(cert)
+ # Return True to indicates that the certificate was ok.
+ return True
+
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ connection = SSL.Connection(context, s)
+ try:
+ connection.connect((host, port))
+ connection.send('')
+ except SSL.SysCallError:
+ pass
+ except socket.gaierror:
+ logging.debug('Host name is not valid')
+ finally:
+ connection.shutdown()
+ connection.close()
+ if not host_certs:
+ logging.warning('Unable to get host certificate from %s:%s', host, port)
+ return ''
+ return _dump_cert(host_certs[-1])
+
+
+def write_dummy_ca_cert(ca_cert_str, key_str, cert_path):
+ """Writes four certificate files.
+
+ For example, if cert_path is "mycert.pem":
+ mycert.pem - CA plus private key
+ mycert-cert.pem - CA in PEM format
+ mycert-cert.cer - CA for Android
+ mycert-cert.p12 - CA in PKCS12 format for Windows devices
+ Args:
+ cert_path: path string such as "mycert.pem"
+ ca_cert_str: certificate string
+ key_str: private key string
+ """
+ dirname = os.path.dirname(cert_path)
+ if dirname and not os.path.exists(dirname):
+ os.makedirs(dirname)
+
+ root_path = os.path.splitext(cert_path)[0]
+ ca_cert_path = root_path + '-cert.pem'
+ android_cer_path = root_path + '-cert.cer'
+ windows_p12_path = root_path + '-cert.p12'
+
+ # Dump the CA plus private key
+ with open(cert_path, 'w') as f:
+ f.write(key_str)
+ f.write(ca_cert_str)
+
+ # Dump the certificate in PEM format
+ with open(ca_cert_path, 'w') as f:
+ f.write(ca_cert_str)
+
+ # Create a .cer file with the same contents for Android
+ with open(android_cer_path, 'w') as f:
+ f.write(ca_cert_str)
+
+ ca_cert = load_cert(ca_cert_str)
+ key = load_privatekey(key_str)
+ # Dump the certificate in PKCS12 format for Windows devices
+ with open(windows_p12_path, 'w') as f:
+ p12 = crypto.PKCS12()
+ p12.set_certificate(ca_cert)
+ p12.set_privatekey(key)
+ f.write(p12.export())
+
+
+def generate_cert(root_ca_cert_str, server_cert_str, server_host):
+ """Generates a cert_str with the sni field in server_cert_str signed by the
+ root_ca_cert_str.
+
+ Args:
+ root_ca_cert_str: PEM formatted string representing the root cert
+ server_cert_str: PEM formatted string representing cert
+ server_host: host name to use if there is no server_cert_str
+ Returns:
+ a PEM formatted certificate string
+ """
+ EXTENSION_WHITELIST = set(['subjectAltName'])
+
+ if openssl_import_error:
+ raise openssl_import_error # pylint: disable=raising-bad-type
+
+ common_name = server_host
+ reused_extensions = []
+ if server_cert_str:
+ original_cert = load_cert(server_cert_str)
+ common_name = original_cert.get_subject().commonName
+ for i in xrange(original_cert.get_extension_count()):
+ original_cert_extension = original_cert.get_extension(i)
+ if original_cert_extension.get_short_name() in EXTENSION_WHITELIST:
+ reused_extensions.append(original_cert_extension)
+
+ ca_cert = load_cert(root_ca_cert_str)
+ ca_key = load_privatekey(root_ca_cert_str)
+
+ cert = crypto.X509()
+ cert.get_subject().CN = common_name
+ cert.gmtime_adj_notBefore(-60 * 60)
+ cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
+ cert.set_issuer(ca_cert.get_subject())
+ cert.set_serial_number(int(time.time()*10000))
+ cert.set_pubkey(ca_key)
+ cert.add_extensions(reused_extensions)
+ cert.sign(ca_key, 'sha256')
+
+ return _dump_cert(cert)
+
+
+def install_cert_in_nssdb(home_directory_path, certificate_path):
+ """Installs a certificate into the ~/.pki/nssdb database.
+
+ Args:
+ home_directory_path: Path of the home directory where to install
+ certificate_path: Path of a CA in PEM format
+ """
+ assert os.path.isdir(home_directory_path)
+ assert platform.system() == 'Linux', \
+ 'SSL certification authority has only been tested for linux.'
+ if (os.path.abspath(home_directory_path) ==
+ os.path.abspath(os.environ['HOME'])):
+ raise Exception('Modifying $HOME/.pki/nssdb compromises your machine.')
+
+ cert_database_path = os.path.join(home_directory_path, '.pki', 'nssdb')
+ def certutil(args):
+ cmd = ['certutil', '--empty-password', '-d', 'sql:' + cert_database_path]
+ cmd.extend(args)
+ logging.info(subprocess.list2cmdline(cmd))
+ subprocess.check_call(cmd)
+
+ if not os.path.isdir(cert_database_path):
+ os.makedirs(cert_database_path)
+ certutil(['-N'])
+
+ certutil(['-A', '-t', 'PC,,', '-n', certificate_path, '-i', certificate_path])