aboutsummaryrefslogtreecommitdiff
path: root/tools/ocspclient.py
diff options
context:
space:
mode:
authorelie <elie>2011-02-17 18:35:16 +0000
committerelie <elie>2011-02-17 18:35:16 +0000
commit8b513895caeda00b34f662bec79238a5bf636383 (patch)
treec0d45c812a29111261b57d33e0a0c05bccfb9636 /tools/ocspclient.py
downloadpyasn1-modules-8b513895caeda00b34f662bec79238a5bf636383.tar.gz
Initial revision
Diffstat (limited to 'tools/ocspclient.py')
-rw-r--r--tools/ocspclient.py136
1 files changed, 136 insertions, 0 deletions
diff --git a/tools/ocspclient.py b/tools/ocspclient.py
new file mode 100644
index 0000000..6680a10
--- /dev/null
+++ b/tools/ocspclient.py
@@ -0,0 +1,136 @@
+from pyasn1.codec.der import decoder, encoder
+from pyasn1_modules import rfc2560, rfc2459, pem
+from pyasn1.type import univ
+import sys, urllib2, hashlib
+
+sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26))
+
+class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder):
+ # These methods just do not encode tag and length fields of TLV
+ def encodeTag(self, *args): return ''
+ def encodeLength(self, *args): return ''
+ def encodeValue(*args):
+ substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(*args)
+ # OCSP-specific hack follows: cut off the "unused bit count"
+ # encoded bit-string value.
+ return substrate[1:], isConstructed
+
+ def __call__(self, bitStringValue):
+ return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0)
+
+valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder()
+
+def mkOcspRequest(issuerCert, userCert):
+ issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate')
+ issuerSubject = issuerTbsCertificate.getComponentByName('subject')
+
+ userTbsCertificate = userCert.getComponentByName('tbsCertificate')
+ userIssuer = userTbsCertificate.getComponentByName('issuer')
+
+ assert issuerSubject == userIssuer, '%s\n%s' % (
+ issuerSubject.prettyPrint(), userIssuer.prettyPrint()
+ )
+
+ userIssuerHash = hashlib.sha1(
+ encoder.encode(userIssuer)
+ ).digest()
+
+ issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPublicKeyInfo').getComponentByName('subjectPublicKey')
+
+ issuerKeyHash = hashlib.sha1(
+ valueOnlyBitStringEncoder(issuerSubjectPublicKey)
+ ).digest()
+
+ userSerialNumber = userTbsCertificate.getComponentByName('serialNumber')
+
+ # Build request object
+
+ request = rfc2560.Request()
+
+ reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert')
+
+ hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByName('hashAlgorithm')
+ hashAlgorithm.setComponentByName('algorithm', sha1oid)
+
+ reqCert.setComponentByName('issuerNameHash', userIssuerHash)
+ reqCert.setComponentByName('issuerKeyHash', issuerKeyHash)
+ reqCert.setComponentByName('serialNumber', userSerialNumber)
+
+ ocspRequest = rfc2560.OCSPRequest()
+
+ tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName('tbsRequest')
+ tbsRequest.setComponentByName('version', 'v1')
+
+ requestList = tbsRequest.setComponentByName('requestList').getComponentByName('requestList')
+ requestList.setComponentByPosition(0, request)
+
+ return ocspRequest
+
+def parseOcspResponse(ocspResponse):
+ responseStatus = ocspResponse.getComponentByName('responseStatus')
+ assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint()
+ responseBytes = ocspResponse.getComponentByName('responseBytes')
+ responseType = responseBytes.getComponentByName('responseType')
+ assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint()
+
+ response = responseBytes.getComponentByName('response')
+
+ basicOCSPResponse, _ = decoder.decode(
+ response, asn1Spec=rfc2560.BasicOCSPResponse()
+ )
+
+ tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData')
+
+ response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0)
+
+ return (
+ tbsResponseData.getComponentByName('producedAt'),
+ response0.getComponentByName('certID'),
+ response0.getComponentByName('certStatus').getName(),
+ response0.getComponentByName('thisUpdate')
+ )
+
+if len(sys.argv) != 2:
+ print """Usage:
+$ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.argv[0]
+ sys.exit(-1)
+else:
+ ocspUrl = sys.argv[1]
+
+# Parse CA and user certificates
+
+issuerCert, _ = decoder.decode(
+ pem.readPemFromFile(sys.stdin),
+ asn1Spec=rfc2459.Certificate()
+ )
+userCert, _ = decoder.decode(
+ pem.readPemFromFile(sys.stdin),
+ asn1Spec=rfc2459.Certificate()
+ )
+
+# Build OCSP request
+
+ocspReq = mkOcspRequest(issuerCert, userCert)
+
+# Use HTTP POST to get response (see Appendix A of RFC 2560)
+# In case you need proxies, set the http_proxy env variable
+
+httpReq = urllib2.Request(
+ ocspUrl,
+ encoder.encode(ocspReq),
+ { 'Content-Type': 'application/ocsp-request' }
+ )
+httpRsp = urllib2.urlopen(httpReq).read()
+
+# Process OCSP response
+
+ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse())
+
+producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp)
+
+print 'Certificate ID %s is %s at %s till %s\n' % (
+ certId.getComponentByName('serialNumber'),
+ certStatus,
+ producedAt,
+ thisUpdate
+ )