summaryrefslogtreecommitdiff
path: root/src/cryptography/hazmat/backends/openssl/decode_asn1.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cryptography/hazmat/backends/openssl/decode_asn1.py')
-rw-r--r--src/cryptography/hazmat/backends/openssl/decode_asn1.py260
1 files changed, 123 insertions, 137 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/decode_asn1.py b/src/cryptography/hazmat/backends/openssl/decode_asn1.py
index 007675d4e..cc9b8c0e3 100644
--- a/src/cryptography/hazmat/backends/openssl/decode_asn1.py
+++ b/src/cryptography/hazmat/backends/openssl/decode_asn1.py
@@ -7,23 +7,20 @@ from __future__ import absolute_import, division, print_function
import datetime
import ipaddress
-import asn1crypto.core
-
import six
from cryptography import x509
+from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE
from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM
from cryptography.x509.name import _ASN1_TYPE_TO_ENUM
from cryptography.x509.oid import (
- CRLEntryExtensionOID, CertificatePoliciesOID, ExtensionOID,
+ CRLEntryExtensionOID,
+ CertificatePoliciesOID,
+ ExtensionOID,
OCSPExtensionOID,
)
-class _Integers(asn1crypto.core.SequenceOf):
- _child_spec = asn1crypto.core.Integer
-
-
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
@@ -67,9 +64,9 @@ def _decode_x509_name(backend, x509_name):
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attribute = _decode_x509_name_entry(backend, entry)
- set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
+ set_id = backend._lib.X509_NAME_ENTRY_set(entry)
if set_id != prev_set_id:
- attributes.append(set([attribute]))
+ attributes.append({attribute})
else:
# is in the same RDN a previous entry
attributes[-1].add(attribute)
@@ -124,10 +121,10 @@ def _decode_general_name(backend, gn):
# netmask. To handle this we convert the netmask to integer, then
# find the first 0 bit, which will be the prefix. If another 1
# bit is present after that the netmask is invalid.
- base = ipaddress.ip_address(data[:data_len // 2])
- netmask = ipaddress.ip_address(data[data_len // 2:])
+ base = ipaddress.ip_address(data[: data_len // 2])
+ netmask = ipaddress.ip_address(data[data_len // 2 :])
bits = bin(int(netmask))[2:]
- prefix = bits.find('0')
+ prefix = bits.find("0")
# If no 0 bits are found it is a /32 or /128
if prefix == -1:
prefix = len(bits)
@@ -135,7 +132,7 @@ def _decode_general_name(backend, gn):
if "1" in bits[prefix:]:
raise ValueError("Invalid netmask")
- ip = ipaddress.ip_network(base.exploded + u"/{0}".format(prefix))
+ ip = ipaddress.ip_network(base.exploded + u"/{}".format(prefix))
else:
ip = ipaddress.ip_address(data)
@@ -160,10 +157,10 @@ def _decode_general_name(backend, gn):
else:
# x400Address or ediPartyName
raise x509.UnsupportedGeneralNameType(
- "{0} is not a supported type".format(
+ "{} is not a supported type".format(
x509._GENERAL_NAMES.get(gn.type, gn.type)
),
- gn.type
+ gn.type,
)
@@ -184,48 +181,57 @@ def _decode_delta_crl_indicator(backend, ext):
class _X509ExtensionParser(object):
- def __init__(self, ext_count, get_ext, handlers):
+ def __init__(self, backend, ext_count, get_ext, handlers):
self.ext_count = ext_count
self.get_ext = get_ext
self.handlers = handlers
+ self._backend = backend
- def parse(self, backend, x509_obj):
+ def parse(self, x509_obj):
extensions = []
seen_oids = set()
- for i in range(self.ext_count(backend, x509_obj)):
- ext = self.get_ext(backend, x509_obj, i)
- backend.openssl_assert(ext != backend._ffi.NULL)
- crit = backend._lib.X509_EXTENSION_get_critical(ext)
+ for i in range(self.ext_count(x509_obj)):
+ ext = self.get_ext(x509_obj, i)
+ self._backend.openssl_assert(ext != self._backend._ffi.NULL)
+ crit = self._backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
- _obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
+ _obj2txt(
+ self._backend,
+ self._backend._lib.X509_EXTENSION_get_object(ext),
+ )
)
if oid in seen_oids:
raise x509.DuplicateExtension(
- "Duplicate {0} extension found".format(oid), oid
+ "Duplicate {} extension found".format(oid), oid
)
# These OIDs are only supported in OpenSSL 1.1.0+ but we want
# to support them in all versions of OpenSSL so we decode them
# ourselves.
if oid == ExtensionOID.TLS_FEATURE:
- data = backend._lib.X509_EXTENSION_get_data(ext)
- parsed = _Integers.load(_asn1_string_to_bytes(backend, data))
+ # The extension contents are a SEQUENCE OF INTEGERs.
+ data = self._backend._lib.X509_EXTENSION_get_data(ext)
+ data_bytes = _asn1_string_to_bytes(self._backend, data)
+ features = DERReader(data_bytes).read_single_element(SEQUENCE)
+ parsed = []
+ while not features.is_empty():
+ parsed.append(features.read_element(INTEGER).as_integer())
+ # Map the features to their enum value.
value = x509.TLSFeature(
- [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed]
+ [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed]
)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
continue
elif oid == ExtensionOID.PRECERT_POISON:
- data = backend._lib.X509_EXTENSION_get_data(ext)
- parsed = asn1crypto.core.Null.load(
- _asn1_string_to_bytes(backend, data)
+ data = self._backend._lib.X509_EXTENSION_get_data(ext)
+ # The contents of the extension must be an ASN.1 NULL.
+ reader = DERReader(_asn1_string_to_bytes(self._backend, data))
+ reader.read_single_element(NULL).check_empty()
+ extensions.append(
+ x509.Extension(oid, critical, x509.PrecertPoison())
)
- assert parsed == asn1crypto.core.Null()
- extensions.append(x509.Extension(
- oid, critical, x509.PrecertPoison()
- ))
seen_oids.add(oid)
continue
@@ -233,23 +239,21 @@ class _X509ExtensionParser(object):
handler = self.handlers[oid]
except KeyError:
# Dump the DER payload into an UnrecognizedExtension object
- data = backend._lib.X509_EXTENSION_get_data(ext)
- backend.openssl_assert(data != backend._ffi.NULL)
- der = backend._ffi.buffer(data.data, data.length)[:]
+ data = self._backend._lib.X509_EXTENSION_get_data(ext)
+ self._backend.openssl_assert(data != self._backend._ffi.NULL)
+ der = self._backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
- extensions.append(
- x509.Extension(oid, critical, unrecognized)
- )
+ extensions.append(x509.Extension(oid, critical, unrecognized))
else:
- ext_data = backend._lib.X509V3_EXT_d2i(ext)
- if ext_data == backend._ffi.NULL:
- backend._consume_errors()
+ ext_data = self._backend._lib.X509V3_EXT_d2i(ext)
+ if ext_data == self._backend._ffi.NULL:
+ self._backend._consume_errors()
raise ValueError(
- "The {0} extension is invalid and can't be "
+ "The {} extension is invalid and can't be "
"parsed".format(oid)
)
- value = handler(backend, ext_data)
+ value = handler(self._backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
@@ -271,16 +275,12 @@ def _decode_certificate_policies(backend, cp):
qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
qualifiers = []
for j in range(qnum):
- pqi = backend._lib.sk_POLICYQUALINFO_value(
- pi.qualifiers, j
- )
- pqualid = x509.ObjectIdentifier(
- _obj2txt(backend, pqi.pqualid)
- )
+ pqi = backend._lib.sk_POLICYQUALINFO_value(pi.qualifiers, j)
+ pqualid = x509.ObjectIdentifier(_obj2txt(backend, pqi.pqualid))
if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
cpsuri = backend._ffi.buffer(
pqi.d.cpsuri.data, pqi.d.cpsuri.length
- )[:].decode('ascii')
+ )[:].decode("ascii")
qualifiers.append(cpsuri)
else:
assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
@@ -289,9 +289,7 @@ def _decode_certificate_policies(backend, cp):
)
qualifiers.append(user_notice)
- certificate_policies.append(
- x509.PolicyInformation(oid, qualifiers)
- )
+ certificate_policies.append(x509.PolicyInformation(oid, qualifiers))
return x509.CertificatePolicies(certificate_policies)
@@ -304,13 +302,9 @@ def _decode_user_notice(backend, un):
explicit_text = _asn1_string_to_utf8(backend, un.exptext)
if un.noticeref != backend._ffi.NULL:
- organization = _asn1_string_to_utf8(
- backend, un.noticeref.organization
- )
+ organization = _asn1_string_to_utf8(backend, un.noticeref.organization)
- num = backend._lib.sk_ASN1_INTEGER_num(
- un.noticeref.noticenos
- )
+ num = backend._lib.sk_ASN1_INTEGER_num(un.noticeref.noticenos)
notice_numbers = []
for i in range(num):
asn1_int = backend._lib.sk_ASN1_INTEGER_value(
@@ -319,9 +313,7 @@ def _decode_user_notice(backend, un):
notice_num = _asn1_integer_to_int(backend, asn1_int)
notice_numbers.append(notice_num)
- notice_reference = x509.NoticeReference(
- organization, notice_numbers
- )
+ notice_reference = x509.NoticeReference(organization, notice_numbers)
return x509.UserNotice(notice_reference, explicit_text)
@@ -364,9 +356,7 @@ def _decode_authority_key_identifier(backend, akid):
)[:]
if akid.issuer != backend._ffi.NULL:
- authority_cert_issuer = _decode_general_names(
- backend, akid.issuer
- )
+ authority_cert_issuer = _decode_general_names(backend, akid.issuer)
authority_cert_serial_number = _asn1_integer_to_int_or_none(
backend, akid.serial
@@ -377,22 +367,40 @@ def _decode_authority_key_identifier(backend, akid):
)
-def _decode_authority_information_access(backend, aia):
- aia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", aia)
- aia = backend._ffi.gc(aia, backend._lib.sk_ACCESS_DESCRIPTION_free)
- num = backend._lib.sk_ACCESS_DESCRIPTION_num(aia)
+def _decode_information_access(backend, ia):
+ ia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", ia)
+ ia = backend._ffi.gc(
+ ia,
+ lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free(
+ x,
+ backend._ffi.addressof(
+ backend._lib._original_lib, "ACCESS_DESCRIPTION_free"
+ ),
+ ),
+ )
+ num = backend._lib.sk_ACCESS_DESCRIPTION_num(ia)
access_descriptions = []
for i in range(num):
- ad = backend._lib.sk_ACCESS_DESCRIPTION_value(aia, i)
+ ad = backend._lib.sk_ACCESS_DESCRIPTION_value(ia, i)
backend.openssl_assert(ad.method != backend._ffi.NULL)
oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method))
backend.openssl_assert(ad.location != backend._ffi.NULL)
gn = _decode_general_name(backend, ad.location)
access_descriptions.append(x509.AccessDescription(oid, gn))
+ return access_descriptions
+
+
+def _decode_authority_information_access(backend, aia):
+ access_descriptions = _decode_information_access(backend, aia)
return x509.AuthorityInformationAccess(access_descriptions)
+def _decode_subject_information_access(backend, aia):
+ access_descriptions = _decode_information_access(backend, aia)
+ return x509.SubjectInformationAccess(access_descriptions)
+
+
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
@@ -415,7 +423,7 @@ def _decode_key_usage(backend, bit_string):
key_cert_sign,
crl_sign,
encipher_only,
- decipher_only
+ decipher_only,
)
@@ -483,8 +491,13 @@ def _decode_issuing_dist_point(backend, idp):
only_some_reasons = None
return x509.IssuingDistributionPoint(
- full_name, relative_name, only_user, only_ca, only_some_reasons,
- indirect_crl, only_attr
+ full_name,
+ relative_name,
+ only_user,
+ only_ca,
+ only_some_reasons,
+ indirect_crl,
+ only_attr,
)
@@ -605,13 +618,9 @@ def _decode_distpoint(backend, distpoint):
rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns)
attributes = set()
for i in range(rnum):
- rn = backend._lib.sk_X509_NAME_ENTRY_value(
- rns, i
- )
+ rn = backend._lib.sk_X509_NAME_ENTRY_value(rns, i)
backend.openssl_assert(rn != backend._ffi.NULL)
- attributes.add(
- _decode_x509_name_entry(backend, rn)
- )
+ attributes.add(_decode_x509_name_entry(backend, rn))
relative_name = x509.RelativeDistinguishedName(attributes)
@@ -635,10 +644,11 @@ def _decode_inhibit_any_policy(backend, asn1_int):
return x509.InhibitAnyPolicy(skip_certs)
-def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
+def _decode_scts(backend, asn1_scts):
from cryptography.hazmat.backends.openssl.x509 import (
- _SignedCertificateTimestamp
+ _SignedCertificateTimestamp,
)
+
asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts)
asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free)
@@ -647,7 +657,17 @@ def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
sct = backend._lib.sk_SCT_value(asn1_scts, i)
scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct))
- return x509.PrecertificateSignedCertificateTimestamps(scts)
+ return scts
+
+
+def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
+ return x509.PrecertificateSignedCertificateTimestamps(
+ _decode_scts(backend, asn1_scts)
+ )
+
+
+def _decode_signed_certificate_timestamps(backend, asn1_scts):
+ return x509.SignedCertificateTimestamps(_decode_scts(backend, asn1_scts))
# CRLReason ::= ENUMERATED {
@@ -686,7 +706,7 @@ _CRL_ENTRY_REASON_ENUM_TO_CODE = {
x509.ReasonFlags.certificate_hold: 6,
x509.ReasonFlags.remove_from_crl: 8,
x509.ReasonFlags.privilege_withdrawn: 9,
- x509.ReasonFlags.aa_compromise: 10
+ x509.ReasonFlags.aa_compromise: 10,
}
@@ -698,13 +718,11 @@ def _decode_crl_reason(backend, enum):
try:
return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code])
except KeyError:
- raise ValueError("Unsupported reason code: {0}".format(code))
+ raise ValueError("Unsupported reason code: {}".format(code))
def _decode_invalidity_date(backend, inv_date):
- generalized_time = backend._ffi.cast(
- "ASN1_GENERALIZEDTIME *", inv_date
- )
+ generalized_time = backend._ffi.cast("ASN1_GENERALIZEDTIME *", inv_date)
generalized_time = backend._ffi.gc(
generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
)
@@ -758,14 +776,14 @@ def _asn1_string_to_utf8(backend, asn1_string):
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
- "Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
+ "Unsupported ASN1 string type. Type: {}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
- return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
+ return backend._ffi.buffer(buf[0], res)[:].decode("utf8")
def _parse_asn1_time(backend, asn1_time):
@@ -799,7 +817,7 @@ def _decode_nonce(backend, nonce):
return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce))
-_EXTENSION_HANDLERS_NO_SCT = {
+_EXTENSION_HANDLERS_BASE = {
ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints,
ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier,
ExtensionOID.KEY_USAGE: _decode_key_usage,
@@ -809,6 +827,9 @@ _EXTENSION_HANDLERS_NO_SCT = {
ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
_decode_authority_information_access
),
+ ExtensionOID.SUBJECT_INFORMATION_ACCESS: (
+ _decode_subject_information_access
+ ),
ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies,
ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points,
ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
@@ -818,11 +839,11 @@ _EXTENSION_HANDLERS_NO_SCT = {
ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints,
ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints,
}
-_EXTENSION_HANDLERS = _EXTENSION_HANDLERS_NO_SCT.copy()
-_EXTENSION_HANDLERS[
- ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS
-] = _decode_precert_signed_certificate_timestamps
-
+_EXTENSION_HANDLERS_SCT = {
+ ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS: (
+ _decode_precert_signed_certificate_timestamps
+ )
+}
_REVOKED_EXTENSION_HANDLERS = {
CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason,
@@ -839,6 +860,7 @@ _CRL_EXTENSION_HANDLERS = {
_decode_authority_information_access
),
ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point,
+ ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
}
_OCSP_REQ_EXTENSION_HANDLERS = {
@@ -849,44 +871,8 @@ _OCSP_BASICRESP_EXTENSION_HANDLERS = {
OCSPExtensionOID.NONCE: _decode_nonce,
}
-_CERTIFICATE_EXTENSION_PARSER_NO_SCT = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
- get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i),
- handlers=_EXTENSION_HANDLERS_NO_SCT
-)
-
-_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
- get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i),
- handlers=_EXTENSION_HANDLERS
-)
-
-_CSR_EXTENSION_PARSER = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.sk_X509_EXTENSION_num(x),
- get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i),
- handlers=_EXTENSION_HANDLERS
-)
-
-_REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x),
- get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i),
- handlers=_REVOKED_EXTENSION_HANDLERS,
-)
-
-_CRL_EXTENSION_PARSER = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.X509_CRL_get_ext_count(x),
- get_ext=lambda backend, x, i: backend._lib.X509_CRL_get_ext(x, i),
- handlers=_CRL_EXTENSION_HANDLERS,
-)
-
-_OCSP_REQ_EXT_PARSER = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.OCSP_REQUEST_get_ext_count(x),
- get_ext=lambda backend, x, i: backend._lib.OCSP_REQUEST_get_ext(x, i),
- handlers=_OCSP_REQ_EXTENSION_HANDLERS,
-)
-
-_OCSP_BASICRESP_EXT_PARSER = _X509ExtensionParser(
- ext_count=lambda backend, x: backend._lib.OCSP_BASICRESP_get_ext_count(x),
- get_ext=lambda backend, x, i: backend._lib.OCSP_BASICRESP_get_ext(x, i),
- handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS,
-)
+_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT = {
+ ExtensionOID.SIGNED_CERTIFICATE_TIMESTAMPS: (
+ _decode_signed_certificate_timestamps
+ )
+}