summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Kehrer <paul.l.kehrer@gmail.com>2018-10-02 07:54:31 +0800
committerAlex Gaynor <alex.gaynor@gmail.com>2018-10-01 19:54:31 -0400
commita07de31096767abd3b4529ae29c0487c8f21310b (patch)
tree28eb5577ebb97209332c50b17c92e29e981ac644
parent1717f8c998b22fbbebec4b5514aee42fb3a2f68d (diff)
downloadcryptography-a07de31096767abd3b4529ae29c0487c8f21310b.tar.gz
support OCSP response parsing (#4452)
* support OCSP response parsing * move the decorator to make pep8 happy * add some missing docs * review feedback * more review feedback
-rw-r--r--docs/x509/ocsp.rst21
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py14
-rw-r--r--src/cryptography/hazmat/backends/openssl/ocsp.py244
-rw-r--r--src/cryptography/x509/ocsp.py8
-rw-r--r--tests/x509/test_ocsp.py137
5 files changed, 420 insertions, 4 deletions
diff --git a/docs/x509/ocsp.rst b/docs/x509/ocsp.rst
index 14d9bb849..b20302260 100644
--- a/docs/x509/ocsp.rst
+++ b/docs/x509/ocsp.rst
@@ -74,6 +74,7 @@ OCSP
b",\xdez\t\xbe1\x1bC\xbc\x1c*MSX\x02\x15\x00\x98\xd9\xe5\xc0\xb4\xc3"
b"sU-\xf7|]\x0f\x1e\xb5\x12\x8eIE\xf9"
)
+ der_ocsp_resp_unauth = b"0\x03\n\x01\x06"
OCSP (Online Certificate Status Protocol) is a method of checking the
revocation status of certificates. It is specified in :rfc:`6960`, as well
@@ -151,6 +152,26 @@ Creating Requests
>>> base64.b64encode(req.public_bytes(serialization.Encoding.DER))
b'MEMwQTA/MD0wOzAJBgUrDgMCGgUABBRAC0Z68eay0wmDug1gfn5ZN0gkxAQUw5zz/NNGCDS7zkZ/oHxb8+IIy1kCAj8g'
+Loading Responses
+~~~~~~~~~~~~~~~~~
+
+.. function:: load_der_ocsp_response(data)
+
+ .. versionadded:: 2.4
+
+ Deserialize an OCSP response from DER encoded data.
+
+ :param bytes data: The DER encoded OCSP response data.
+
+ :returns: An instance of :class:`~cryptography.x509.ocsp.OCSPResponse`.
+
+ .. doctest::
+
+ >>> from cryptography.x509 import ocsp
+ >>> ocsp_resp = ocsp.load_der_ocsp_response(der_ocsp_resp_unauth)
+ >>> print(ocsp_resp.response_status)
+ OCSPResponseStatus.UNAUTHORIZED
+
Interfaces
~~~~~~~~~~
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index 8b4792b42..f374a8e38 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -42,7 +42,9 @@ from cryptography.hazmat.backends.openssl.encode_asn1 import (
)
from cryptography.hazmat.backends.openssl.hashes import _HashContext
from cryptography.hazmat.backends.openssl.hmac import _HMACContext
-from cryptography.hazmat.backends.openssl.ocsp import _OCSPRequest
+from cryptography.hazmat.backends.openssl.ocsp import (
+ _OCSPRequest, _OCSPResponse
+)
from cryptography.hazmat.backends.openssl.rsa import (
_RSAPrivateKey, _RSAPublicKey
)
@@ -1441,6 +1443,16 @@ class Backend(object):
request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free)
return _OCSPRequest(self, request)
+ def load_der_ocsp_response(self, data):
+ mem_bio = self._bytes_to_bio(data)
+ response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL)
+ if response == self._ffi.NULL:
+ self._consume_errors()
+ raise ValueError("Unable to load OCSP response")
+
+ response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free)
+ return _OCSPResponse(self, response)
+
def create_ocsp_request(self, builder):
ocsp_req = self._lib.OCSP_REQUEST_new()
self.openssl_assert(ocsp_req != self._ffi.NULL)
diff --git a/src/cryptography/hazmat/backends/openssl/ocsp.py b/src/cryptography/hazmat/backends/openssl/ocsp.py
index 420d7eb6a..f3f18cb08 100644
--- a/src/cryptography/hazmat/backends/openssl/ocsp.py
+++ b/src/cryptography/hazmat/backends/openssl/ocsp.py
@@ -4,13 +4,35 @@
from __future__ import absolute_import, division, print_function
-from cryptography import utils
+import functools
+
+from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl.decode_asn1 import (
- _OCSP_REQ_EXT_PARSER, _asn1_integer_to_int, _asn1_string_to_bytes, _obj2txt
+ _CRL_ENTRY_REASON_CODE_TO_ENUM, _OCSP_REQ_EXT_PARSER, _asn1_integer_to_int,
+ _asn1_string_to_bytes, _decode_x509_name, _obj2txt,
+ _parse_asn1_generalized_time,
)
+from cryptography.hazmat.backends.openssl.x509 import _Certificate
from cryptography.hazmat.primitives import serialization
-from cryptography.x509.ocsp import OCSPRequest, _OIDS_TO_HASH
+from cryptography.x509.ocsp import (
+ OCSPCertStatus, OCSPRequest, OCSPResponse, OCSPResponseStatus,
+ _CERT_STATUS_TO_ENUM, _OIDS_TO_HASH, _RESPONSE_STATUS_TO_ENUM,
+)
+
+
+def _requires_successful_response(func):
+ @functools.wraps(func)
+ def wrapper(self, *args):
+ if self.response_status != OCSPResponseStatus.SUCCESSFUL:
+ raise ValueError(
+ "OCSP response status is not successful so the property "
+ "has no value"
+ )
+ else:
+ return func(self, *args)
+
+ return wrapper
def _issuer_key_hash(backend, cert_id):
@@ -63,6 +85,222 @@ def _hash_algorithm(backend, cert_id):
)
+@utils.register_interface(OCSPResponse)
+class _OCSPResponse(object):
+ def __init__(self, backend, ocsp_response):
+ self._backend = backend
+ self._ocsp_response = ocsp_response
+ status = self._backend._lib.OCSP_response_status(self._ocsp_response)
+ self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
+ self._status = _RESPONSE_STATUS_TO_ENUM[status]
+ if self._status is OCSPResponseStatus.SUCCESSFUL:
+ basic = self._backend._lib.OCSP_response_get1_basic(
+ self._ocsp_response
+ )
+ self._backend.openssl_assert(basic != self._backend._ffi.NULL)
+ self._basic = self._backend._ffi.gc(
+ basic, self._backend._lib.OCSP_BASICRESP_free
+ )
+ self._backend.openssl_assert(
+ self._backend._lib.OCSP_resp_count(self._basic) == 1
+ )
+ self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
+ self._backend.openssl_assert(
+ self._single != self._backend._ffi.NULL
+ )
+ self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
+ self._single
+ )
+ self._backend.openssl_assert(
+ self._cert_id != self._backend._ffi.NULL
+ )
+
+ response_status = utils.read_only_property("_status")
+
+ @property
+ @_requires_successful_response
+ def signature_algorithm_oid(self):
+ alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
+ self._backend.openssl_assert(alg != self._backend._ffi.NULL)
+ oid = _obj2txt(self._backend, alg.algorithm)
+ return x509.ObjectIdentifier(oid)
+
+ @property
+ @_requires_successful_response
+ def signature(self):
+ sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
+ self._backend.openssl_assert(sig != self._backend._ffi.NULL)
+ return _asn1_string_to_bytes(self._backend, sig)
+
+ @property
+ @_requires_successful_response
+ def tbs_response_bytes(self):
+ respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
+ self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
+ pp = self._backend._ffi.new("unsigned char **")
+ res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
+ self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
+ pp = self._backend._ffi.gc(
+ pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
+ )
+ self._backend.openssl_assert(res > 0)
+ return self._backend._ffi.buffer(pp[0], res)[:]
+
+ @property
+ @_requires_successful_response
+ def certificates(self):
+ sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
+ num = self._backend._lib.sk_X509_num(sk_x509)
+ certs = []
+ for i in range(num):
+ x509 = self._backend._lib.sk_X509_value(sk_x509, i)
+ self._backend.openssl_assert(x509 != self._backend._ffi.NULL)
+ cert = _Certificate(self._backend, x509)
+ # We need to keep the OCSP response that the certificate came from
+ # alive until the Certificate object itself goes out of scope, so
+ # we give it a private reference.
+ cert._ocsp_resp = self
+ certs.append(cert)
+
+ return certs
+
+ @property
+ @_requires_successful_response
+ def responder_key_hash(self):
+ _, asn1_string = self._responder_key_name()
+ if asn1_string == self._backend._ffi.NULL:
+ return None
+ else:
+ return _asn1_string_to_bytes(self._backend, asn1_string)
+
+ @property
+ @_requires_successful_response
+ def responder_name(self):
+ x509_name, _ = self._responder_key_name()
+ if x509_name == self._backend._ffi.NULL:
+ return None
+ else:
+ return _decode_x509_name(self._backend, x509_name)
+
+ def _responder_key_name(self):
+ asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
+ x509_name = self._backend._ffi.new("X509_NAME **")
+ res = self._backend._lib.OCSP_resp_get0_id(
+ self._basic, asn1_string, x509_name
+ )
+ self._backend.openssl_assert(res == 1)
+ return x509_name[0], asn1_string[0]
+
+ @property
+ @_requires_successful_response
+ def produced_at(self):
+ produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
+ self._basic
+ )
+ return _parse_asn1_generalized_time(self._backend, produced_at)
+
+ @property
+ @_requires_successful_response
+ def certificate_status(self):
+ status = self._backend._lib.OCSP_single_get0_status(
+ self._single,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ )
+ self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
+ return _CERT_STATUS_TO_ENUM[status]
+
+ @property
+ @_requires_successful_response
+ def revocation_time(self):
+ if self.certificate_status is not OCSPCertStatus.REVOKED:
+ return None
+
+ asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
+ self._backend._lib.OCSP_single_get0_status(
+ self._single,
+ self._backend._ffi.NULL,
+ asn1_time,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ )
+ self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
+ return _parse_asn1_generalized_time(self._backend, asn1_time[0])
+
+ @property
+ @_requires_successful_response
+ def revocation_reason(self):
+ if self.certificate_status is not OCSPCertStatus.REVOKED:
+ return None
+
+ reason_ptr = self._backend._ffi.new("int *")
+ self._backend._lib.OCSP_single_get0_status(
+ self._single,
+ reason_ptr,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ )
+ # If no reason is encoded OpenSSL returns -1
+ if reason_ptr[0] == -1:
+ return None
+ else:
+ self._backend.openssl_assert(
+ reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
+ )
+ return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
+
+ @property
+ @_requires_successful_response
+ def this_update(self):
+ asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
+ self._backend._lib.OCSP_single_get0_status(
+ self._single,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ asn1_time,
+ self._backend._ffi.NULL,
+ )
+ self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
+ return _parse_asn1_generalized_time(self._backend, asn1_time[0])
+
+ @property
+ @_requires_successful_response
+ def next_update(self):
+ asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
+ self._backend._lib.OCSP_single_get0_status(
+ self._single,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ asn1_time,
+ )
+ self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
+ return _parse_asn1_generalized_time(self._backend, asn1_time[0])
+
+ @property
+ @_requires_successful_response
+ def issuer_key_hash(self):
+ return _issuer_key_hash(self._backend, self._cert_id)
+
+ @property
+ @_requires_successful_response
+ def issuer_name_hash(self):
+ return _issuer_name_hash(self._backend, self._cert_id)
+
+ @property
+ @_requires_successful_response
+ def hash_algorithm(self):
+ return _hash_algorithm(self._backend, self._cert_id)
+
+ @property
+ @_requires_successful_response
+ def serial_number(self):
+ return _serial_number(self._backend, self._cert_id)
+
+
@utils.register_interface(OCSPRequest)
class _OCSPRequest(object):
def __init__(self, backend, ocsp_request):
diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py
index 7535a0b33..7907bcae4 100644
--- a/src/cryptography/x509/ocsp.py
+++ b/src/cryptography/x509/ocsp.py
@@ -40,11 +40,19 @@ class OCSPCertStatus(Enum):
UNKNOWN = 2
+_CERT_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPCertStatus)
+
+
def load_der_ocsp_request(data):
from cryptography.hazmat.backends.openssl.backend import backend
return backend.load_der_ocsp_request(data)
+def load_der_ocsp_response(data):
+ from cryptography.hazmat.backends.openssl.backend import backend
+ return backend.load_der_ocsp_response(data)
+
+
class OCSPRequestBuilder(object):
def __init__(self, request=None):
self._request = request
diff --git a/tests/x509/test_ocsp.py b/tests/x509/test_ocsp.py
index a646f4b7e..aeaa6e6c0 100644
--- a/tests/x509/test_ocsp.py
+++ b/tests/x509/test_ocsp.py
@@ -5,6 +5,7 @@
from __future__ import absolute_import, division, print_function
import base64
+import datetime
import os
import pytest
@@ -12,6 +13,7 @@ import pytest
from cryptography import x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.x509 import ocsp
from .test_x509 import _load_cert
@@ -146,3 +148,138 @@ class TestOCSPRequestBuilder(object):
b"MEMwQTA/MD0wOzAJBgUrDgMCGgUABBRAC0Z68eay0wmDug1gfn5ZN0gkxAQUw5zz"
b"/NNGCDS7zkZ/oHxb8+IIy1kCAj8g"
)
+
+
+class TestOCSPResponse(object):
+ def test_bad_response(self):
+ with pytest.raises(ValueError):
+ ocsp.load_der_ocsp_response(b"invalid")
+
+ def test_load_response(self):
+ resp = _load_data(
+ os.path.join("x509", "ocsp", "resp-sha256.der"),
+ ocsp.load_der_ocsp_response,
+ )
+ from cryptography.hazmat.backends.openssl.backend import backend
+ issuer = _load_cert(
+ os.path.join("x509", "letsencryptx3.pem"),
+ x509.load_pem_x509_certificate,
+ backend
+ )
+ assert resp.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL
+ assert (resp.signature_algorithm_oid ==
+ x509.SignatureAlgorithmOID.RSA_WITH_SHA256)
+ assert resp.signature == base64.b64decode(
+ b"I9KUlyLV/2LbNCVu1BQphxdNlU/jBzXsPYVscPjW5E93pCrSO84GkIWoOJtqsnt"
+ b"78DLcQPnF3W24NXGzSGKlSWfXIsyoXCxnBm0mIbD5ZMnKyXEnqSR33Z9He/A+ML"
+ b"A8gbrDUipGNPosesenkKUnOtFIzEGv29hV5E6AMP2ORPVsVlTAZegPJFbbVIWc0"
+ b"rZGFCXKxijDxtUtgWzBhpBAI50JbPHi+IVuaOe4aDJLYgZ0BIBNa6bDI+rScyoy"
+ b"5U0DToV7SZn6CoJ3U19X7BHdYn6TLX0xi43eXuzBGzdHnSzmsc7r/DvkAKJm3vb"
+ b"dVECXqe/gFlXJUBcZ25jhs70MUA=="
+ )
+ assert resp.tbs_response_bytes == base64.b64decode(
+ b"MIHWoUwwSjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzA"
+ b"hBgNVBAMTGkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzGA8yMDE4MDgzMDExMT"
+ b"UwMFowdTBzMEswCQYFKw4DAhoFAAQUfuZq53Kas/z4oiBkbBahLWBxCF0EFKhKa"
+ b"mMEfd265tE5t6ZFZe/zqOyhAhIDHHh6fckClQB7xfIiCztSevCAABgPMjAxODA4"
+ b"MzAxMTAwMDBaoBEYDzIwMTgwOTA2MTEwMDAwWg=="
+ )
+ issuer.public_key().verify(
+ resp.signature,
+ resp.tbs_response_bytes,
+ PKCS1v15(),
+ hashes.SHA256()
+ )
+ assert resp.certificates == []
+ assert resp.responder_key_hash is None
+ assert resp.responder_name == issuer.subject
+ assert resp.produced_at == datetime.datetime(2018, 8, 30, 11, 15)
+ assert resp.certificate_status == ocsp.OCSPCertStatus.GOOD
+ assert resp.revocation_time is None
+ assert resp.revocation_reason is None
+ assert resp.this_update == datetime.datetime(2018, 8, 30, 11, 0)
+ assert resp.next_update == datetime.datetime(2018, 9, 6, 11, 0)
+ assert resp.issuer_key_hash == (
+ b'\xa8Jjc\x04}\xdd\xba\xe6\xd19\xb7\xa6Ee\xef\xf3\xa8\xec\xa1'
+ )
+ assert resp.issuer_name_hash == (
+ b'~\xe6j\xe7r\x9a\xb3\xfc\xf8\xa2 dl\x16\xa1-`q\x08]'
+ )
+ assert isinstance(resp.hash_algorithm, hashes.SHA1)
+ assert resp.serial_number == 271024907440004808294641238224534273948400
+
+ def test_load_unauthorized(self):
+ resp = _load_data(
+ os.path.join("x509", "ocsp", "resp-unauthorized.der"),
+ ocsp.load_der_ocsp_response,
+ )
+ assert resp.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED
+ with pytest.raises(ValueError):
+ assert resp.signature_algorithm_oid
+ with pytest.raises(ValueError):
+ assert resp.signature
+ with pytest.raises(ValueError):
+ assert resp.tbs_response_bytes
+ with pytest.raises(ValueError):
+ assert resp.certificates
+ with pytest.raises(ValueError):
+ assert resp.responder_key_hash
+ with pytest.raises(ValueError):
+ assert resp.responder_name
+ with pytest.raises(ValueError):
+ assert resp.produced_at
+ with pytest.raises(ValueError):
+ assert resp.certificate_status
+ with pytest.raises(ValueError):
+ assert resp.revocation_time
+ with pytest.raises(ValueError):
+ assert resp.revocation_reason
+ with pytest.raises(ValueError):
+ assert resp.this_update
+ with pytest.raises(ValueError):
+ assert resp.next_update
+ with pytest.raises(ValueError):
+ assert resp.issuer_key_hash
+ with pytest.raises(ValueError):
+ assert resp.issuer_name_hash
+ with pytest.raises(ValueError):
+ assert resp.hash_algorithm
+ with pytest.raises(ValueError):
+ assert resp.serial_number
+
+ def test_load_revoked(self):
+ resp = _load_data(
+ os.path.join("x509", "ocsp", "resp-revoked.der"),
+ ocsp.load_der_ocsp_response,
+ )
+ assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED
+ assert resp.revocation_time == datetime.datetime(
+ 2016, 9, 2, 21, 28, 48
+ )
+ assert resp.revocation_reason is None
+
+ def test_load_delegate_unknown_cert(self):
+ resp = _load_data(
+ os.path.join("x509", "ocsp", "resp-delegate-unknown-cert.der"),
+ ocsp.load_der_ocsp_response,
+ )
+ assert len(resp.certificates) == 1
+ assert isinstance(resp.certificates[0], x509.Certificate)
+ assert resp.certificate_status == ocsp.OCSPCertStatus.UNKNOWN
+
+ def test_load_responder_key_hash(self):
+ resp = _load_data(
+ os.path.join("x509", "ocsp", "resp-responder-key-hash.der"),
+ ocsp.load_der_ocsp_response,
+ )
+ assert resp.responder_name is None
+ assert resp.responder_key_hash == (
+ b'\x0f\x80a\x1c\x821a\xd5/(\xe7\x8dF8\xb4,\xe1\xc6\xd9\xe2'
+ )
+
+ def test_load_revoked_reason(self):
+ resp = _load_data(
+ os.path.join("x509", "ocsp", "resp-revoked-reason.der"),
+ ocsp.load_der_ocsp_response,
+ )
+ assert resp.revocation_reason is x509.ReasonFlags.superseded