diff options
Diffstat (limited to 'src/cryptography/hazmat/backends/openssl/decode_asn1.py')
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/decode_asn1.py | 260 |
1 files changed, 123 insertions, 137 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/decode_asn1.py b/src/cryptography/hazmat/backends/openssl/decode_asn1.py index 007675d4e..cc9b8c0e3 100644 --- a/src/cryptography/hazmat/backends/openssl/decode_asn1.py +++ b/src/cryptography/hazmat/backends/openssl/decode_asn1.py @@ -7,23 +7,20 @@ from __future__ import absolute_import, division, print_function import datetime import ipaddress -import asn1crypto.core - import six from cryptography import x509 +from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM from cryptography.x509.name import _ASN1_TYPE_TO_ENUM from cryptography.x509.oid import ( - CRLEntryExtensionOID, CertificatePoliciesOID, ExtensionOID, + CRLEntryExtensionOID, + CertificatePoliciesOID, + ExtensionOID, OCSPExtensionOID, ) -class _Integers(asn1crypto.core.SequenceOf): - _child_spec = asn1crypto.core.Integer - - def _obj2txt(backend, obj): # Set to 80 on the recommendation of # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values @@ -67,9 +64,9 @@ def _decode_x509_name(backend, x509_name): for x in range(count): entry = backend._lib.X509_NAME_get_entry(x509_name, x) attribute = _decode_x509_name_entry(backend, entry) - set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry) + set_id = backend._lib.X509_NAME_ENTRY_set(entry) if set_id != prev_set_id: - attributes.append(set([attribute])) + attributes.append({attribute}) else: # is in the same RDN a previous entry attributes[-1].add(attribute) @@ -124,10 +121,10 @@ def _decode_general_name(backend, gn): # netmask. To handle this we convert the netmask to integer, then # find the first 0 bit, which will be the prefix. If another 1 # bit is present after that the netmask is invalid. - base = ipaddress.ip_address(data[:data_len // 2]) - netmask = ipaddress.ip_address(data[data_len // 2:]) + base = ipaddress.ip_address(data[: data_len // 2]) + netmask = ipaddress.ip_address(data[data_len // 2 :]) bits = bin(int(netmask))[2:] - prefix = bits.find('0') + prefix = bits.find("0") # If no 0 bits are found it is a /32 or /128 if prefix == -1: prefix = len(bits) @@ -135,7 +132,7 @@ def _decode_general_name(backend, gn): if "1" in bits[prefix:]: raise ValueError("Invalid netmask") - ip = ipaddress.ip_network(base.exploded + u"/{0}".format(prefix)) + ip = ipaddress.ip_network(base.exploded + u"/{}".format(prefix)) else: ip = ipaddress.ip_address(data) @@ -160,10 +157,10 @@ def _decode_general_name(backend, gn): else: # x400Address or ediPartyName raise x509.UnsupportedGeneralNameType( - "{0} is not a supported type".format( + "{} is not a supported type".format( x509._GENERAL_NAMES.get(gn.type, gn.type) ), - gn.type + gn.type, ) @@ -184,48 +181,57 @@ def _decode_delta_crl_indicator(backend, ext): class _X509ExtensionParser(object): - def __init__(self, ext_count, get_ext, handlers): + def __init__(self, backend, ext_count, get_ext, handlers): self.ext_count = ext_count self.get_ext = get_ext self.handlers = handlers + self._backend = backend - def parse(self, backend, x509_obj): + def parse(self, x509_obj): extensions = [] seen_oids = set() - for i in range(self.ext_count(backend, x509_obj)): - ext = self.get_ext(backend, x509_obj, i) - backend.openssl_assert(ext != backend._ffi.NULL) - crit = backend._lib.X509_EXTENSION_get_critical(ext) + for i in range(self.ext_count(x509_obj)): + ext = self.get_ext(x509_obj, i) + self._backend.openssl_assert(ext != self._backend._ffi.NULL) + crit = self._backend._lib.X509_EXTENSION_get_critical(ext) critical = crit == 1 oid = x509.ObjectIdentifier( - _obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext)) + _obj2txt( + self._backend, + self._backend._lib.X509_EXTENSION_get_object(ext), + ) ) if oid in seen_oids: raise x509.DuplicateExtension( - "Duplicate {0} extension found".format(oid), oid + "Duplicate {} extension found".format(oid), oid ) # These OIDs are only supported in OpenSSL 1.1.0+ but we want # to support them in all versions of OpenSSL so we decode them # ourselves. if oid == ExtensionOID.TLS_FEATURE: - data = backend._lib.X509_EXTENSION_get_data(ext) - parsed = _Integers.load(_asn1_string_to_bytes(backend, data)) + # The extension contents are a SEQUENCE OF INTEGERs. + data = self._backend._lib.X509_EXTENSION_get_data(ext) + data_bytes = _asn1_string_to_bytes(self._backend, data) + features = DERReader(data_bytes).read_single_element(SEQUENCE) + parsed = [] + while not features.is_empty(): + parsed.append(features.read_element(INTEGER).as_integer()) + # Map the features to their enum value. value = x509.TLSFeature( - [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed] + [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed] ) extensions.append(x509.Extension(oid, critical, value)) seen_oids.add(oid) continue elif oid == ExtensionOID.PRECERT_POISON: - data = backend._lib.X509_EXTENSION_get_data(ext) - parsed = asn1crypto.core.Null.load( - _asn1_string_to_bytes(backend, data) + data = self._backend._lib.X509_EXTENSION_get_data(ext) + # The contents of the extension must be an ASN.1 NULL. + reader = DERReader(_asn1_string_to_bytes(self._backend, data)) + reader.read_single_element(NULL).check_empty() + extensions.append( + x509.Extension(oid, critical, x509.PrecertPoison()) ) - assert parsed == asn1crypto.core.Null() - extensions.append(x509.Extension( - oid, critical, x509.PrecertPoison() - )) seen_oids.add(oid) continue @@ -233,23 +239,21 @@ class _X509ExtensionParser(object): handler = self.handlers[oid] except KeyError: # Dump the DER payload into an UnrecognizedExtension object - data = backend._lib.X509_EXTENSION_get_data(ext) - backend.openssl_assert(data != backend._ffi.NULL) - der = backend._ffi.buffer(data.data, data.length)[:] + data = self._backend._lib.X509_EXTENSION_get_data(ext) + self._backend.openssl_assert(data != self._backend._ffi.NULL) + der = self._backend._ffi.buffer(data.data, data.length)[:] unrecognized = x509.UnrecognizedExtension(oid, der) - extensions.append( - x509.Extension(oid, critical, unrecognized) - ) + extensions.append(x509.Extension(oid, critical, unrecognized)) else: - ext_data = backend._lib.X509V3_EXT_d2i(ext) - if ext_data == backend._ffi.NULL: - backend._consume_errors() + ext_data = self._backend._lib.X509V3_EXT_d2i(ext) + if ext_data == self._backend._ffi.NULL: + self._backend._consume_errors() raise ValueError( - "The {0} extension is invalid and can't be " + "The {} extension is invalid and can't be " "parsed".format(oid) ) - value = handler(backend, ext_data) + value = handler(self._backend, ext_data) extensions.append(x509.Extension(oid, critical, value)) seen_oids.add(oid) @@ -271,16 +275,12 @@ def _decode_certificate_policies(backend, cp): qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers) qualifiers = [] for j in range(qnum): - pqi = backend._lib.sk_POLICYQUALINFO_value( - pi.qualifiers, j - ) - pqualid = x509.ObjectIdentifier( - _obj2txt(backend, pqi.pqualid) - ) + pqi = backend._lib.sk_POLICYQUALINFO_value(pi.qualifiers, j) + pqualid = x509.ObjectIdentifier(_obj2txt(backend, pqi.pqualid)) if pqualid == CertificatePoliciesOID.CPS_QUALIFIER: cpsuri = backend._ffi.buffer( pqi.d.cpsuri.data, pqi.d.cpsuri.length - )[:].decode('ascii') + )[:].decode("ascii") qualifiers.append(cpsuri) else: assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE @@ -289,9 +289,7 @@ def _decode_certificate_policies(backend, cp): ) qualifiers.append(user_notice) - certificate_policies.append( - x509.PolicyInformation(oid, qualifiers) - ) + certificate_policies.append(x509.PolicyInformation(oid, qualifiers)) return x509.CertificatePolicies(certificate_policies) @@ -304,13 +302,9 @@ def _decode_user_notice(backend, un): explicit_text = _asn1_string_to_utf8(backend, un.exptext) if un.noticeref != backend._ffi.NULL: - organization = _asn1_string_to_utf8( - backend, un.noticeref.organization - ) + organization = _asn1_string_to_utf8(backend, un.noticeref.organization) - num = backend._lib.sk_ASN1_INTEGER_num( - un.noticeref.noticenos - ) + num = backend._lib.sk_ASN1_INTEGER_num(un.noticeref.noticenos) notice_numbers = [] for i in range(num): asn1_int = backend._lib.sk_ASN1_INTEGER_value( @@ -319,9 +313,7 @@ def _decode_user_notice(backend, un): notice_num = _asn1_integer_to_int(backend, asn1_int) notice_numbers.append(notice_num) - notice_reference = x509.NoticeReference( - organization, notice_numbers - ) + notice_reference = x509.NoticeReference(organization, notice_numbers) return x509.UserNotice(notice_reference, explicit_text) @@ -364,9 +356,7 @@ def _decode_authority_key_identifier(backend, akid): )[:] if akid.issuer != backend._ffi.NULL: - authority_cert_issuer = _decode_general_names( - backend, akid.issuer - ) + authority_cert_issuer = _decode_general_names(backend, akid.issuer) authority_cert_serial_number = _asn1_integer_to_int_or_none( backend, akid.serial @@ -377,22 +367,40 @@ def _decode_authority_key_identifier(backend, akid): ) -def _decode_authority_information_access(backend, aia): - aia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", aia) - aia = backend._ffi.gc(aia, backend._lib.sk_ACCESS_DESCRIPTION_free) - num = backend._lib.sk_ACCESS_DESCRIPTION_num(aia) +def _decode_information_access(backend, ia): + ia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", ia) + ia = backend._ffi.gc( + ia, + lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free( + x, + backend._ffi.addressof( + backend._lib._original_lib, "ACCESS_DESCRIPTION_free" + ), + ), + ) + num = backend._lib.sk_ACCESS_DESCRIPTION_num(ia) access_descriptions = [] for i in range(num): - ad = backend._lib.sk_ACCESS_DESCRIPTION_value(aia, i) + ad = backend._lib.sk_ACCESS_DESCRIPTION_value(ia, i) backend.openssl_assert(ad.method != backend._ffi.NULL) oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method)) backend.openssl_assert(ad.location != backend._ffi.NULL) gn = _decode_general_name(backend, ad.location) access_descriptions.append(x509.AccessDescription(oid, gn)) + return access_descriptions + + +def _decode_authority_information_access(backend, aia): + access_descriptions = _decode_information_access(backend, aia) return x509.AuthorityInformationAccess(access_descriptions) +def _decode_subject_information_access(backend, aia): + access_descriptions = _decode_information_access(backend, aia) + return x509.SubjectInformationAccess(access_descriptions) + + def _decode_key_usage(backend, bit_string): bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string) bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free) @@ -415,7 +423,7 @@ def _decode_key_usage(backend, bit_string): key_cert_sign, crl_sign, encipher_only, - decipher_only + decipher_only, ) @@ -483,8 +491,13 @@ def _decode_issuing_dist_point(backend, idp): only_some_reasons = None return x509.IssuingDistributionPoint( - full_name, relative_name, only_user, only_ca, only_some_reasons, - indirect_crl, only_attr + full_name, + relative_name, + only_user, + only_ca, + only_some_reasons, + indirect_crl, + only_attr, ) @@ -605,13 +618,9 @@ def _decode_distpoint(backend, distpoint): rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns) attributes = set() for i in range(rnum): - rn = backend._lib.sk_X509_NAME_ENTRY_value( - rns, i - ) + rn = backend._lib.sk_X509_NAME_ENTRY_value(rns, i) backend.openssl_assert(rn != backend._ffi.NULL) - attributes.add( - _decode_x509_name_entry(backend, rn) - ) + attributes.add(_decode_x509_name_entry(backend, rn)) relative_name = x509.RelativeDistinguishedName(attributes) @@ -635,10 +644,11 @@ def _decode_inhibit_any_policy(backend, asn1_int): return x509.InhibitAnyPolicy(skip_certs) -def _decode_precert_signed_certificate_timestamps(backend, asn1_scts): +def _decode_scts(backend, asn1_scts): from cryptography.hazmat.backends.openssl.x509 import ( - _SignedCertificateTimestamp + _SignedCertificateTimestamp, ) + asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts) asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free) @@ -647,7 +657,17 @@ def _decode_precert_signed_certificate_timestamps(backend, asn1_scts): sct = backend._lib.sk_SCT_value(asn1_scts, i) scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct)) - return x509.PrecertificateSignedCertificateTimestamps(scts) + return scts + + +def _decode_precert_signed_certificate_timestamps(backend, asn1_scts): + return x509.PrecertificateSignedCertificateTimestamps( + _decode_scts(backend, asn1_scts) + ) + + +def _decode_signed_certificate_timestamps(backend, asn1_scts): + return x509.SignedCertificateTimestamps(_decode_scts(backend, asn1_scts)) # CRLReason ::= ENUMERATED { @@ -686,7 +706,7 @@ _CRL_ENTRY_REASON_ENUM_TO_CODE = { x509.ReasonFlags.certificate_hold: 6, x509.ReasonFlags.remove_from_crl: 8, x509.ReasonFlags.privilege_withdrawn: 9, - x509.ReasonFlags.aa_compromise: 10 + x509.ReasonFlags.aa_compromise: 10, } @@ -698,13 +718,11 @@ def _decode_crl_reason(backend, enum): try: return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code]) except KeyError: - raise ValueError("Unsupported reason code: {0}".format(code)) + raise ValueError("Unsupported reason code: {}".format(code)) def _decode_invalidity_date(backend, inv_date): - generalized_time = backend._ffi.cast( - "ASN1_GENERALIZEDTIME *", inv_date - ) + generalized_time = backend._ffi.cast("ASN1_GENERALIZEDTIME *", inv_date) generalized_time = backend._ffi.gc( generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free ) @@ -758,14 +776,14 @@ def _asn1_string_to_utf8(backend, asn1_string): res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string) if res == -1: raise ValueError( - "Unsupported ASN1 string type. Type: {0}".format(asn1_string.type) + "Unsupported ASN1 string type. Type: {}".format(asn1_string.type) ) backend.openssl_assert(buf[0] != backend._ffi.NULL) buf = backend._ffi.gc( buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) ) - return backend._ffi.buffer(buf[0], res)[:].decode('utf8') + return backend._ffi.buffer(buf[0], res)[:].decode("utf8") def _parse_asn1_time(backend, asn1_time): @@ -799,7 +817,7 @@ def _decode_nonce(backend, nonce): return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce)) -_EXTENSION_HANDLERS_NO_SCT = { +_EXTENSION_HANDLERS_BASE = { ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints, ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier, ExtensionOID.KEY_USAGE: _decode_key_usage, @@ -809,6 +827,9 @@ _EXTENSION_HANDLERS_NO_SCT = { ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( _decode_authority_information_access ), + ExtensionOID.SUBJECT_INFORMATION_ACCESS: ( + _decode_subject_information_access + ), ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies, ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points, ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, @@ -818,11 +839,11 @@ _EXTENSION_HANDLERS_NO_SCT = { ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints, ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints, } -_EXTENSION_HANDLERS = _EXTENSION_HANDLERS_NO_SCT.copy() -_EXTENSION_HANDLERS[ - ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS -] = _decode_precert_signed_certificate_timestamps - +_EXTENSION_HANDLERS_SCT = { + ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS: ( + _decode_precert_signed_certificate_timestamps + ) +} _REVOKED_EXTENSION_HANDLERS = { CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason, @@ -839,6 +860,7 @@ _CRL_EXTENSION_HANDLERS = { _decode_authority_information_access ), ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point, + ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, } _OCSP_REQ_EXTENSION_HANDLERS = { @@ -849,44 +871,8 @@ _OCSP_BASICRESP_EXTENSION_HANDLERS = { OCSPExtensionOID.NONCE: _decode_nonce, } -_CERTIFICATE_EXTENSION_PARSER_NO_SCT = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x), - get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i), - handlers=_EXTENSION_HANDLERS_NO_SCT -) - -_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x), - get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i), - handlers=_EXTENSION_HANDLERS -) - -_CSR_EXTENSION_PARSER = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.sk_X509_EXTENSION_num(x), - get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i), - handlers=_EXTENSION_HANDLERS -) - -_REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x), - get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i), - handlers=_REVOKED_EXTENSION_HANDLERS, -) - -_CRL_EXTENSION_PARSER = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.X509_CRL_get_ext_count(x), - get_ext=lambda backend, x, i: backend._lib.X509_CRL_get_ext(x, i), - handlers=_CRL_EXTENSION_HANDLERS, -) - -_OCSP_REQ_EXT_PARSER = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.OCSP_REQUEST_get_ext_count(x), - get_ext=lambda backend, x, i: backend._lib.OCSP_REQUEST_get_ext(x, i), - handlers=_OCSP_REQ_EXTENSION_HANDLERS, -) - -_OCSP_BASICRESP_EXT_PARSER = _X509ExtensionParser( - ext_count=lambda backend, x: backend._lib.OCSP_BASICRESP_get_ext_count(x), - get_ext=lambda backend, x, i: backend._lib.OCSP_BASICRESP_get_ext(x, i), - handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS, -) +_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT = { + ExtensionOID.SIGNED_CERTIFICATE_TIMESTAMPS: ( + _decode_signed_certificate_timestamps + ) +} |