diff options
Diffstat (limited to 'src/OpenSSL/crypto.py')
-rw-r--r-- | src/OpenSSL/crypto.py | 536 |
1 files changed, 333 insertions, 203 deletions
diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py index 12e92df..4265525 100644 --- a/src/OpenSSL/crypto.py +++ b/src/OpenSSL/crypto.py @@ -1,3 +1,4 @@ +import calendar import datetime from base64 import b16encode @@ -7,11 +8,11 @@ from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ from six import ( integer_types as _integer_types, text_type as _text_type, - PY3 as _PY3) + PY2 as _PY2, +) -from cryptography import x509 +from cryptography import utils, x509 from cryptography.hazmat.primitives.asymmetric import dsa, rsa -from cryptography.utils import deprecated from OpenSSL._util import ( ffi as _ffi, @@ -19,48 +20,49 @@ from OpenSSL._util import ( exception_from_error_queue as _exception_from_error_queue, byte_string as _byte_string, native as _native, + path_string as _path_string, UNSPECIFIED as _UNSPECIFIED, text_to_bytes_and_warn as _text_to_bytes_and_warn, make_assert as _make_assert, ) __all__ = [ - 'FILETYPE_PEM', - 'FILETYPE_ASN1', - 'FILETYPE_TEXT', - 'TYPE_RSA', - 'TYPE_DSA', - 'Error', - 'PKey', - 'get_elliptic_curves', - 'get_elliptic_curve', - 'X509Name', - 'X509Extension', - 'X509Req', - 'X509', - 'X509StoreFlags', - 'X509Store', - 'X509StoreContextError', - 'X509StoreContext', - 'load_certificate', - 'dump_certificate', - 'dump_publickey', - 'dump_privatekey', - 'Revoked', - 'CRL', - 'PKCS7', - 'PKCS12', - 'NetscapeSPKI', - 'load_publickey', - 'load_privatekey', - 'dump_certificate_request', - 'load_certificate_request', - 'sign', - 'verify', - 'dump_crl', - 'load_crl', - 'load_pkcs7_data', - 'load_pkcs12' + "FILETYPE_PEM", + "FILETYPE_ASN1", + "FILETYPE_TEXT", + "TYPE_RSA", + "TYPE_DSA", + "Error", + "PKey", + "get_elliptic_curves", + "get_elliptic_curve", + "X509Name", + "X509Extension", + "X509Req", + "X509", + "X509StoreFlags", + "X509Store", + "X509StoreContextError", + "X509StoreContext", + "load_certificate", + "dump_certificate", + "dump_publickey", + "dump_privatekey", + "Revoked", + "CRL", + "PKCS7", + "PKCS12", + "NetscapeSPKI", + "load_publickey", + "load_privatekey", + "dump_certificate_request", + "load_certificate_request", + "sign", + "verify", + "dump_crl", + "load_crl", + "load_pkcs7_data", + "load_pkcs12", ] FILETYPE_PEM = _lib.SSL_FILETYPE_PEM @@ -94,6 +96,7 @@ def _get_backend(): triggering this side effect unless _get_backend is called. """ from cryptography.hazmat.backends.openssl.backend import backend + return backend @@ -136,7 +139,7 @@ def _bio_to_string(bio): """ Copy the contents of an OpenSSL BIO object into a Python byte string. """ - result_buffer = _ffi.new('char**') + result_buffer = _ffi.new("char**") buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) return _ffi.buffer(result_buffer[0], buffer_length)[:] @@ -173,7 +176,7 @@ def _get_asn1_time(timestamp): @return: The time value from C{timestamp} as a L{bytes} string in a certain format. Or C{None} if the object contains no time value. """ - string_timestamp = _ffi.cast('ASN1_STRING*', timestamp) + string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) if _lib.ASN1_STRING_length(string_timestamp) == 0: return None elif ( @@ -196,7 +199,8 @@ def _get_asn1_time(timestamp): _untested_error("ASN1_TIME_to_generalizedtime") else: string_timestamp = _ffi.cast( - "ASN1_STRING*", generalized_timestamp[0]) + "ASN1_STRING*", generalized_timestamp[0] + ) string_data = _lib.ASN1_STRING_data(string_timestamp) string_result = _ffi.string(string_data) _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) @@ -220,6 +224,7 @@ class PKey(object): """ A class representing an DSA or RSA public key or key pair. """ + _only_public = False _initialized = True @@ -258,8 +263,15 @@ class PKey(object): .. versionadded:: 16.1.0 """ pkey = cls() - if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey, - dsa.DSAPublicKey, dsa.DSAPrivateKey)): + if not isinstance( + crypto_key, + ( + rsa.RSAPublicKey, + rsa.RSAPrivateKey, + dsa.DSAPublicKey, + dsa.DSAPrivateKey, + ), + ): raise TypeError("Unsupported key type") pkey._pkey = crypto_key._evp_pkey @@ -346,7 +358,7 @@ class PKey(object): rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) rsa = _ffi.gc(rsa, _lib.RSA_free) result = _lib.RSA_check_key(rsa) - if result: + if result == 1: return True _raise_current_error() @@ -367,13 +379,6 @@ class PKey(object): return _lib.EVP_PKEY_bits(self._pkey) -PKeyType = deprecated( - PKey, __name__, - "PKeyType has been deprecated, use PKey instead", - DeprecationWarning -) - - class _EllipticCurve(object): """ A representation of a supported elliptic curve. @@ -383,10 +388,11 @@ class _EllipticCurve(object): instances each of which represents one curve supported by the system. @type _curves: :py:type:`NoneType` or :py:type:`set` """ + _curves = None - if _PY3: - # This only necessary on Python 3. Morever, it is broken on Python 2. + if not _PY2: + # This only necessary on Python 3. Moreover, it is broken on Python 2. def __ne__(self, other): """ Implement cooperation with the right-hand side argument of ``!=``. @@ -409,14 +415,12 @@ class _EllipticCurve(object): elliptic curves the underlying library supports. """ num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) - builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves) + builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) # The return value on this call should be num_curves again. We # could check it to make sure but if it *isn't* then.. what could # we do? Abort the whole process, I suppose...? -exarkun lib.EC_get_builtin_curves(builtin_curves, num_curves) - return set( - cls.from_nid(lib, c.nid) - for c in builtin_curves) + return set(cls.from_nid(lib, c.nid) for c in builtin_curves) @classmethod def _get_elliptic_curves(cls, lib): @@ -549,14 +553,16 @@ class X509Name(object): self._name = _ffi.gc(name, _lib.X509_NAME_free) def __setattr__(self, name, value): - if name.startswith('_'): + if name.startswith("_"): return super(X509Name, self).__setattr__(name, value) # Note: we really do not want str subclasses here, so we do not use # isinstance. if type(name) is not str: - raise TypeError("attribute name must be string, not '%.200s'" % ( - type(value).__name__,)) + raise TypeError( + "attribute name must be string, not '%.200s'" + % (type(value).__name__,) + ) nid = _lib.OBJ_txt2nid(_byte_string(name)) if nid == _lib.NID_undef: @@ -577,10 +583,11 @@ class X509Name(object): break if isinstance(value, _text_type): - value = value.encode('utf-8') + value = value.encode("utf-8") add_result = _lib.X509_NAME_add_entry_by_NID( - self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0) + self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0 + ) if not add_result: _raise_current_error() @@ -616,9 +623,9 @@ class X509Name(object): _openssl_assert(data_length >= 0) try: - result = _ffi.buffer( - result_buffer[0], data_length - )[:].decode('utf-8') + result = _ffi.buffer(result_buffer[0], data_length)[:].decode( + "utf-8" + ) finally: # XXX untested _lib.OPENSSL_free(result_buffer[0]) @@ -630,6 +637,7 @@ class X509Name(object): return NotImplemented result = _lib.X509_NAME_cmp(self._name, other._name) return op(result, 0) + return f __eq__ = _cmp(__eq__) @@ -647,11 +655,13 @@ class X509Name(object): """ result_buffer = _ffi.new("char[]", 512) format_result = _lib.X509_NAME_oneline( - self._name, result_buffer, len(result_buffer)) + self._name, result_buffer, len(result_buffer) + ) _openssl_assert(format_result != _ffi.NULL) return "<X509Name object '%s'>" % ( - _native(_ffi.string(result_buffer)),) + _native(_ffi.string(result_buffer)), + ) def hash(self): """ @@ -672,7 +682,7 @@ class X509Name(object): :return: The DER encoded form of this name. :rtype: :py:class:`bytes` """ - result_buffer = _ffi.new('unsigned char**') + result_buffer = _ffi.new("unsigned char**") encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) _openssl_assert(encode_result >= 0) @@ -699,20 +709,14 @@ class X509Name(object): # ffi.string does not handle strings containing NULL bytes # (which may have been generated by old, broken software) - value = _ffi.buffer(_lib.ASN1_STRING_data(fval), - _lib.ASN1_STRING_length(fval))[:] + value = _ffi.buffer( + _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval) + )[:] result.append((_ffi.string(name), value)) return result -X509NameType = deprecated( - X509Name, __name__, - "X509NameType has been deprecated, use X509Name instead", - DeprecationWarning -) - - class X509Extension(object): """ An X.509 v3 certificate extension. @@ -808,7 +812,8 @@ class X509Extension(object): parts.append(_native(_bio_to_string(bio))) else: value = _native( - _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]) + _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] + ) parts.append(label + ":" + value) return ", ".join(parts) @@ -858,19 +863,12 @@ class X509Extension(object): .. versionadded:: 0.12 """ octet_result = _lib.X509_EXTENSION_get_data(self._extension) - string_result = _ffi.cast('ASN1_STRING*', octet_result) + string_result = _ffi.cast("ASN1_STRING*", octet_result) char_result = _lib.ASN1_STRING_data(string_result) result_length = _lib.ASN1_STRING_length(string_result) return _ffi.buffer(char_result, result_length)[:] -X509ExtensionType = deprecated( - X509Extension, __name__, - "X509ExtensionType has been deprecated, use X509Extension instead", - DeprecationWarning -) - - class X509Req(object): """ An X.509 certificate signing requests. @@ -891,8 +889,9 @@ class X509Req(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import ( - _CertificateSigningRequest + _CertificateSigningRequest, ) + backend = _get_backend() return _CertificateSigningRequest(backend, self._req) @@ -1018,9 +1017,20 @@ class X509Req(object): """ exts = [] native_exts_obj = _lib.X509_REQ_get_extensions(self._req) + native_exts_obj = _ffi.gc( + native_exts_obj, + lambda x: _lib.sk_X509_EXTENSION_pop_free( + x, + _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"), + ), + ) + for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)): ext = X509Extension.__new__(X509Extension) - ext._extension = _lib.sk_X509_EXTENSION_value(native_exts_obj, i) + extension = _lib.X509_EXTENSION_dup( + _lib.sk_X509_EXTENSION_value(native_exts_obj, i) + ) + ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) exts.append(ext) return exts @@ -1070,17 +1080,11 @@ class X509Req(object): return result -X509ReqType = deprecated( - X509Req, __name__, - "X509ReqType has been deprecated, use X509Req instead", - DeprecationWarning -) - - class X509(object): """ An X.509 certificate. """ + def __init__(self): x509 = _lib.X509_new() _openssl_assert(x509 != _ffi.NULL) @@ -1106,6 +1110,7 @@ class X509(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import _Certificate + backend = _get_backend() return _Certificate(backend, self._x509) @@ -1247,12 +1252,16 @@ class X509(object): result_length[0] = len(result_buffer) digest_result = _lib.X509_digest( - self._x509, digest, result_buffer, result_length) + self._x509, digest, result_buffer, result_length + ) _openssl_assert(digest_result == 1) - return b":".join([ - b16encode(ch).upper() for ch - in _ffi.buffer(result_buffer, result_length[0])]) + return b":".join( + [ + b16encode(ch).upper() + for ch in _ffi.buffer(result_buffer, result_length[0]) + ] + ) def subject_name_hash(self): """ @@ -1277,7 +1286,7 @@ class X509(object): hex_serial = hex(serial)[2:] if not isinstance(hex_serial, bytes): - hex_serial = hex_serial.encode('ascii') + hex_serial = hex_serial.encode("ascii") bignum_serial = _ffi.new("BIGNUM**") @@ -1288,7 +1297,8 @@ class X509(object): if bignum_serial[0] == _ffi.NULL: set_result = _lib.ASN1_INTEGER_set( - _lib.X509_get_serialNumber(self._x509), small_serial) + _lib.X509_get_serialNumber(self._x509), small_serial + ) if set_result: # TODO Not tested _raise_current_error() @@ -1333,7 +1343,7 @@ class X509(object): if not isinstance(amount, int): raise TypeError("amount must be an integer") - notAfter = _lib.X509_get_notAfter(self._x509) + notAfter = _lib.X509_getm_notAfter(self._x509) _lib.X509_gmtime_adj(notAfter, amount) def gmtime_adj_notBefore(self, amount): @@ -1346,7 +1356,7 @@ class X509(object): if not isinstance(amount, int): raise TypeError("amount must be an integer") - notBefore = _lib.X509_get_notBefore(self._x509) + notBefore = _lib.X509_getm_notBefore(self._x509) _lib.X509_gmtime_adj(notBefore, amount) def has_expired(self): @@ -1375,7 +1385,7 @@ class X509(object): :return: A timestamp string, or ``None`` if there is none. :rtype: bytes or NoneType """ - return self._get_boundary_time(_lib.X509_get_notBefore) + return self._get_boundary_time(_lib.X509_getm_notBefore) def _set_boundary_time(self, which, when): return _set_asn1_time(which(self._x509), when) @@ -1391,7 +1401,7 @@ class X509(object): :param bytes when: A timestamp string. :return: ``None`` """ - return self._set_boundary_time(_lib.X509_get_notBefore, when) + return self._set_boundary_time(_lib.X509_getm_notBefore, when) def get_notAfter(self): """ @@ -1404,7 +1414,7 @@ class X509(object): :return: A timestamp string, or ``None`` if there is none. :rtype: bytes or NoneType """ - return self._get_boundary_time(_lib.X509_get_notAfter) + return self._get_boundary_time(_lib.X509_getm_notAfter) def set_notAfter(self, when): """ @@ -1417,7 +1427,7 @@ class X509(object): :param bytes when: A timestamp string. :return: ``None`` """ - return self._set_boundary_time(_lib.X509_get_notAfter, when) + return self._set_boundary_time(_lib.X509_getm_notAfter, when) def _get_name(self, which): name = X509Name.__new__(X509Name) @@ -1543,13 +1553,6 @@ class X509(object): return ext -X509Type = deprecated( - X509, __name__, - "X509Type has been deprecated, use X509 instead", - DeprecationWarning -) - - class X509StoreFlags(object): """ Flags for X509 verification, used to change the behavior of @@ -1560,6 +1563,7 @@ class X509StoreFlags(object): .. _OpenSSL Verification Flags: https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html """ + CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL @@ -1610,16 +1614,8 @@ class X509Store(object): if not isinstance(cert, X509): raise TypeError() - # As of OpenSSL 1.1.0i adding the same cert to the store more than - # once doesn't cause an error. Accordingly, this code now silences - # the error for OpenSSL < 1.1.0i as well. - if _lib.X509_STORE_add_cert(self._store, cert._x509) == 0: - code = _lib.ERR_peek_error() - err_reason = _lib.ERR_GET_REASON(code) - _openssl_assert( - err_reason == _lib.X509_R_CERT_ALREADY_IN_HASH_TABLE - ) - _lib.ERR_clear_error() + res = _lib.X509_STORE_add_cert(self._store, cert._x509) + _openssl_assert(res == 1) def add_crl(self, crl): """ @@ -1680,15 +1676,57 @@ class X509Store(object): param = _lib.X509_VERIFY_PARAM_new() param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) - _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s'))) + _lib.X509_VERIFY_PARAM_set_time( + param, calendar.timegm(vfy_time.timetuple()) + ) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) + def load_locations(self, cafile, capath=None): + """ + Let X509Store know where we can find trusted certificates for the + certificate chain. Note that the certificates have to be in PEM + format. -X509StoreType = deprecated( - X509Store, __name__, - "X509StoreType has been deprecated, use X509Store instead", - DeprecationWarning -) + If *capath* is passed, it must be a directory prepared using the + ``c_rehash`` tool included with OpenSSL. Either, but not both, of + *cafile* or *capath* may be ``None``. + + .. note:: + + Both *cafile* and *capath* may be set simultaneously. + + Call this method multiple times to add more than one location. + For example, CA certificates, and certificate revocation list bundles + may be passed in *cafile* in subsequent calls to this method. + + .. versionadded:: 20.0 + + :param cafile: In which file we can find the certificates (``bytes`` or + ``unicode``). + :param capath: In which directory we can find the certificates + (``bytes`` or ``unicode``). + + :return: ``None`` if the locations were set successfully. + + :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None`` + or the locations could not be set for any reason. + + """ + if cafile is None: + cafile = _ffi.NULL + else: + cafile = _path_string(cafile) + + if capath is None: + capath = _ffi.NULL + else: + capath = _path_string(capath) + + load_result = _lib.X509_STORE_load_locations( + self._store, cafile, capath + ) + if not load_result: + _raise_current_error() class X509StoreContextError(Exception): @@ -1718,21 +1756,54 @@ class X509StoreContext(object): collected. :ivar _store: See the ``store`` ``__init__`` parameter. :ivar _cert: See the ``certificate`` ``__init__`` parameter. + :ivar _chain: See the ``chain`` ``__init__`` parameter. :param X509Store store: The certificates which will be trusted for the purposes of any verifications. :param X509 certificate: The certificate to be verified. + :param chain: List of untrusted certificates that may be used for building + the certificate chain. May be ``None``. + :type chain: :class:`list` of :class:`X509` """ - def __init__(self, store, certificate): + def __init__(self, store, certificate, chain=None): store_ctx = _lib.X509_STORE_CTX_new() self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) self._store = store self._cert = certificate + self._chain = self._build_certificate_stack(chain) # Make the store context available for use after instantiating this # class by initializing it now. Per testing, subsequent calls to # :meth:`_init` have no adverse affect. self._init() + @staticmethod + def _build_certificate_stack(certificates): + def cleanup(s): + # Equivalent to sk_X509_pop_free, but we don't + # currently have a CFFI binding for that available + for i in range(_lib.sk_X509_num(s)): + x = _lib.sk_X509_value(s, i) + _lib.X509_free(x) + _lib.sk_X509_free(s) + + if certificates is None or len(certificates) == 0: + return _ffi.NULL + + stack = _lib.sk_X509_new_null() + _openssl_assert(stack != _ffi.NULL) + stack = _ffi.gc(stack, cleanup) + + for cert in certificates: + if not isinstance(cert, X509): + raise TypeError("One of the elements is not an X509 instance") + + _openssl_assert(_lib.X509_up_ref(cert._x509) > 0) + if _lib.sk_X509_push(stack, cert._x509) <= 0: + _lib.X509_free(cert._x509) + _raise_current_error() + + return stack + def _init(self): """ Set up the store context for a subsequent verification operation. @@ -1741,7 +1812,7 @@ class X509StoreContext(object): :meth:`_cleanup` will leak memory. """ ret = _lib.X509_STORE_CTX_init( - self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL + self._store_ctx, self._store._store, self._cert._x509, self._chain ) if ret <= 0: _raise_current_error() @@ -1765,8 +1836,13 @@ class X509StoreContext(object): errors = [ _lib.X509_STORE_CTX_get_error(self._store_ctx), _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), - _native(_ffi.string(_lib.X509_verify_cert_error_string( - _lib.X509_STORE_CTX_get_error(self._store_ctx)))), + _native( + _ffi.string( + _lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx) + ) + ) + ), ] # A context error should always be associated with a certificate, so we # expect this call to never return :class:`None`. @@ -1808,6 +1884,45 @@ class X509StoreContext(object): if ret <= 0: raise self._exception_from_context() + def get_verified_chain(self): + """ + Verify a certificate in a context and return the complete validated + chain. + + :raises X509StoreContextError: If an error occurred when validating a + certificate in the context. Sets ``certificate`` attribute to + indicate which certificate caused the error. + + .. versionadded:: 20.0 + """ + # Always re-initialize the store context in case + # :meth:`verify_certificate` is called multiple times. + # + # :meth:`_init` is called in :meth:`__init__` so _cleanup is called + # before _init to ensure memory is not leaked. + self._cleanup() + self._init() + ret = _lib.X509_verify_cert(self._store_ctx) + if ret <= 0: + self._cleanup() + raise self._exception_from_context() + + # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain. + cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx) + _openssl_assert(cert_stack != _ffi.NULL) + + result = [] + for i in range(_lib.sk_X509_num(cert_stack)): + cert = _lib.sk_X509_value(cert_stack, i) + _openssl_assert(cert != _ffi.NULL) + pycert = X509._from_raw_x509_ptr(cert) + result.append(pycert) + + # Free the stack but not the members which are freed by the X509 class. + _lib.sk_X509_free(cert_stack) + self._cleanup() + return result + def load_certificate(type, buffer): """ @@ -1830,8 +1945,7 @@ def load_certificate(type, buffer): elif type == FILETYPE_ASN1: x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) else: - raise ValueError( - "type argument must be FILETYPE_PEM or FILETYPE_ASN1") + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") if x509 == _ffi.NULL: _raise_current_error() @@ -1860,9 +1974,10 @@ def dump_certificate(type, cert): else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) - assert result_code == 1 + _openssl_assert(result_code == 1) return _bio_to_string(bio) @@ -1916,7 +2031,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): if passphrase is None: raise TypeError( "if a value is given for cipher " - "one must also be given for passphrase") + "one must also be given for passphrase" + ) cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) if cipher_obj == _ffi.NULL: raise ValueError("Invalid cipher name") @@ -1926,8 +2042,14 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): helper = _PassphraseHelper(type, passphrase) if type == FILETYPE_PEM: result_code = _lib.PEM_write_bio_PrivateKey( - bio, pkey._pkey, cipher_obj, _ffi.NULL, 0, - helper.callback, helper.callback_args) + bio, + pkey._pkey, + cipher_obj, + _ffi.NULL, + 0, + helper.callback, + helper.callback_args, + ) helper.raise_if_problem() elif type == FILETYPE_ASN1: result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) @@ -1935,15 +2057,13 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") - rsa = _ffi.gc( - _lib.EVP_PKEY_get1_RSA(pkey._pkey), - _lib.RSA_free - ) + rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) result_code = _lib.RSA_print(bio, rsa, 0) else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(result_code != 0) @@ -1954,6 +2074,7 @@ class Revoked(object): """ A certificate revocation. """ + # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches # OCSP_crl_reason_str. We use the latter, just like the command line @@ -1993,7 +2114,8 @@ class Revoked(object): asn1_serial = _ffi.gc( _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL), - _lib.ASN1_INTEGER_free) + _lib.ASN1_INTEGER_free, + ) _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) def get_serial(self): @@ -2044,7 +2166,7 @@ class Revoked(object): elif not isinstance(reason, bytes): raise TypeError("reason must be None or a byte string") else: - reason = reason.lower().replace(b' ', b'') + reason = reason.lower().replace(b" ", b"") reason_code = [r.lower() for r in self._crl_reasons].index(reason) new_reason_ext = _lib.ASN1_ENUMERATED_new() @@ -2056,7 +2178,8 @@ class Revoked(object): self._delete_reason() add_result = _lib.X509_REVOKED_add1_ext_i2d( - self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0) + self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0 + ) _openssl_assert(add_result == 1) def get_reason(self): @@ -2138,8 +2261,9 @@ class CRL(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import ( - _CertificateRevocationList + _CertificateRevocationList, ) + backend = _get_backend() return _CertificateRevocationList(backend, self._crl) @@ -2246,7 +2370,7 @@ class CRL(object): def set_nextUpdate(self, when): """ - Set when the CRL will next be udpated. + Set when the CRL will next be updated. The timestamp is formatted as an ASN.1 TIME:: @@ -2279,13 +2403,15 @@ class CRL(object): digest_obj = _lib.EVP_get_digestbyname(digest) _openssl_assert(digest_obj != _ffi.NULL) _lib.X509_CRL_set_issuer_name( - self._crl, _lib.X509_get_subject_name(issuer_cert._x509)) + self._crl, _lib.X509_get_subject_name(issuer_cert._x509) + ) _lib.X509_CRL_sort(self._crl) result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj) _openssl_assert(result != 0) - def export(self, cert, key, type=FILETYPE_PEM, days=100, - digest=_UNSPECIFIED): + def export( + self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED + ): """ Export the CRL as a string. @@ -2295,7 +2421,7 @@ class CRL(object): :data:`FILETYPE_ASN1`, or :data:`FILETYPE_TEXT`. :param int days: The number of days until the next update of this CRL. :param bytes digest: The name of the message digest to use (eg - ``b"sha2566"``). + ``b"sha256"``). :rtype: bytes """ @@ -2338,13 +2464,6 @@ class CRL(object): return dump_crl(type, self) -CRLType = deprecated( - CRL, __name__, - "CRLType has been deprecated, use CRL instead", - DeprecationWarning -) - - class PKCS7(object): def type_is_signed(self): """ @@ -2389,13 +2508,6 @@ class PKCS7(object): return _ffi.string(string_type) -PKCS7Type = deprecated( - PKCS7, __name__, - "PKCS7Type has been deprecated, use PKCS7 instead", - DeprecationWarning -) - - class PKCS12(object): """ A PKCS #12 archive. @@ -2557,10 +2669,17 @@ class PKCS12(object): cert = self._cert._x509 pkcs12 = _lib.PKCS12_create( - passphrase, friendlyname, pkey, cert, cacerts, + passphrase, + friendlyname, + pkey, + cert, + cacerts, _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, - iter, maciter, 0) + iter, + maciter, + 0, + ) if pkcs12 == _ffi.NULL: _raise_current_error() pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free) @@ -2570,13 +2689,6 @@ class PKCS12(object): return _bio_to_string(bio) -PKCS12Type = deprecated( - PKCS12, __name__, - "PKCS12Type has been deprecated, use PKCS12 instead", - DeprecationWarning -) - - class NetscapeSPKI(object): """ A Netscape SPKI object. @@ -2667,13 +2779,6 @@ class NetscapeSPKI(object): _openssl_assert(set_result == 1) -NetscapeSPKIType = deprecated( - NetscapeSPKI, __name__, - "NetscapeSPKIType has been deprecated, use NetscapeSPKI instead", - DeprecationWarning -) - - class _PassphraseHelper(object): def __init__(self, type, passphrase, more_args=False, truncate=False): if type != FILETYPE_PEM and passphrase is not None: @@ -2689,9 +2794,7 @@ class _PassphraseHelper(object): def callback(self): if self._passphrase is None: return _ffi.NULL - elif isinstance(self._passphrase, bytes): - return _ffi.NULL - elif callable(self._passphrase): + elif isinstance(self._passphrase, bytes) or callable(self._passphrase): return _ffi.callback("pem_password_cb", self._read_passphrase) else: raise TypeError( @@ -2702,9 +2805,7 @@ class _PassphraseHelper(object): def callback_args(self): if self._passphrase is None: return _ffi.NULL - elif isinstance(self._passphrase, bytes): - return self._passphrase - elif callable(self._passphrase): + elif isinstance(self._passphrase, bytes) or callable(self._passphrase): return _ffi.NULL else: raise TypeError( @@ -2724,12 +2825,15 @@ class _PassphraseHelper(object): def _read_passphrase(self, buf, size, rwflag, userdata): try: - if self._more_args: - result = self._passphrase(size, rwflag, userdata) + if callable(self._passphrase): + if self._more_args: + result = self._passphrase(size, rwflag, userdata) + else: + result = self._passphrase(rwflag) else: - result = self._passphrase(rwflag) + result = self._passphrase if not isinstance(result, bytes): - raise ValueError("String expected") + raise ValueError("Bytes expected") if len(result) > size: if self._truncate: result = result[:size] @@ -2738,7 +2842,7 @@ class _PassphraseHelper(object): "passphrase returned by callback is too long" ) for i in range(len(result)): - buf[i] = result[i:i + 1] + buf[i] = result[i : i + 1] return len(result) except Exception as e: self._problems.append(e) @@ -2763,7 +2867,8 @@ def load_publickey(type, buffer): if type == FILETYPE_PEM: evp_pkey = _lib.PEM_read_bio_PUBKEY( - bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + bio, _ffi.NULL, _ffi.NULL, _ffi.NULL + ) elif type == FILETYPE_ASN1: evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) else: @@ -2799,7 +2904,8 @@ def load_privatekey(type, buffer, passphrase=None): helper = _PassphraseHelper(type, passphrase) if type == FILETYPE_PEM: evp_pkey = _lib.PEM_read_bio_PrivateKey( - bio, _ffi.NULL, helper.callback, helper.callback_args) + bio, _ffi.NULL, helper.callback, helper.callback_args + ) helper.raise_if_problem() elif type == FILETYPE_ASN1: evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) @@ -2898,7 +3004,8 @@ def sign(pkey, data, digest): signature_buffer = _ffi.new("unsigned char[]", length) signature_length = _ffi.new("unsigned int *") final_result = _lib.EVP_SignFinal( - md_ctx, signature_buffer, signature_length, pkey._pkey) + md_ctx, signature_buffer, signature_length, pkey._pkey + ) _openssl_assert(final_result == 1) return _ffi.buffer(signature_buffer, signature_length[0])[:] @@ -2962,9 +3069,10 @@ def dump_crl(type, crl): else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) - assert ret == 1 + _openssl_assert(ret == 1) return _bio_to_string(bio) @@ -3027,6 +3135,17 @@ def load_pkcs7_data(type, buffer): return pypkcs7 +load_pkcs7_data = utils.deprecated( + load_pkcs7_data, + __name__, + ( + "PKCS#7 support in pyOpenSSL is deprecated. You should use the APIs " + "in cryptography." + ), + DeprecationWarning, +) + + def load_pkcs12(buffer, passphrase=None): """ Load pkcs12 data from the string *buffer*. If the pkcs12 structure is @@ -3114,6 +3233,17 @@ def load_pkcs12(buffer, passphrase=None): return pkcs12 +load_pkcs12 = utils.deprecated( + load_pkcs12, + __name__, + ( + "PKCS#12 support in pyOpenSSL is deprecated. You should use the APIs " + "in cryptography." + ), + DeprecationWarning, +) + + # There are no direct unit tests for this initialization. It is tested # indirectly since it is necessary for functions like dump_privatekey when # using encryption. @@ -3132,4 +3262,4 @@ _lib.SSL_load_error_strings() # Set the default string mask to match OpenSSL upstream (since 2005) and # RFC5280 recommendations. -_lib.ASN1_STRING_set_default_mask_asc(b'utf8only') +_lib.ASN1_STRING_set_default_mask_asc(b"utf8only") |