diff options
author | elie <elie> | 2011-02-17 18:35:16 +0000 |
---|---|---|
committer | elie <elie> | 2011-02-17 18:35:16 +0000 |
commit | 8b513895caeda00b34f662bec79238a5bf636383 (patch) | |
tree | c0d45c812a29111261b57d33e0a0c05bccfb9636 /tools/ocspclient.py | |
download | pyasn1-modules-8b513895caeda00b34f662bec79238a5bf636383.tar.gz |
Initial revision
Diffstat (limited to 'tools/ocspclient.py')
-rw-r--r-- | tools/ocspclient.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/tools/ocspclient.py b/tools/ocspclient.py new file mode 100644 index 0000000..6680a10 --- /dev/null +++ b/tools/ocspclient.py @@ -0,0 +1,136 @@ +from pyasn1.codec.der import decoder, encoder +from pyasn1_modules import rfc2560, rfc2459, pem +from pyasn1.type import univ +import sys, urllib2, hashlib + +sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26)) + +class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder): + # These methods just do not encode tag and length fields of TLV + def encodeTag(self, *args): return '' + def encodeLength(self, *args): return '' + def encodeValue(*args): + substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(*args) + # OCSP-specific hack follows: cut off the "unused bit count" + # encoded bit-string value. + return substrate[1:], isConstructed + + def __call__(self, bitStringValue): + return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0) + +valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder() + +def mkOcspRequest(issuerCert, userCert): + issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate') + issuerSubject = issuerTbsCertificate.getComponentByName('subject') + + userTbsCertificate = userCert.getComponentByName('tbsCertificate') + userIssuer = userTbsCertificate.getComponentByName('issuer') + + assert issuerSubject == userIssuer, '%s\n%s' % ( + issuerSubject.prettyPrint(), userIssuer.prettyPrint() + ) + + userIssuerHash = hashlib.sha1( + encoder.encode(userIssuer) + ).digest() + + issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPublicKeyInfo').getComponentByName('subjectPublicKey') + + issuerKeyHash = hashlib.sha1( + valueOnlyBitStringEncoder(issuerSubjectPublicKey) + ).digest() + + userSerialNumber = userTbsCertificate.getComponentByName('serialNumber') + + # Build request object + + request = rfc2560.Request() + + reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert') + + hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByName('hashAlgorithm') + hashAlgorithm.setComponentByName('algorithm', sha1oid) + + reqCert.setComponentByName('issuerNameHash', userIssuerHash) + reqCert.setComponentByName('issuerKeyHash', issuerKeyHash) + reqCert.setComponentByName('serialNumber', userSerialNumber) + + ocspRequest = rfc2560.OCSPRequest() + + tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName('tbsRequest') + tbsRequest.setComponentByName('version', 'v1') + + requestList = tbsRequest.setComponentByName('requestList').getComponentByName('requestList') + requestList.setComponentByPosition(0, request) + + return ocspRequest + +def parseOcspResponse(ocspResponse): + responseStatus = ocspResponse.getComponentByName('responseStatus') + assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint() + responseBytes = ocspResponse.getComponentByName('responseBytes') + responseType = responseBytes.getComponentByName('responseType') + assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint() + + response = responseBytes.getComponentByName('response') + + basicOCSPResponse, _ = decoder.decode( + response, asn1Spec=rfc2560.BasicOCSPResponse() + ) + + tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData') + + response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0) + + return ( + tbsResponseData.getComponentByName('producedAt'), + response0.getComponentByName('certID'), + response0.getComponentByName('certStatus').getName(), + response0.getComponentByName('thisUpdate') + ) + +if len(sys.argv) != 2: + print """Usage: +$ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.argv[0] + sys.exit(-1) +else: + ocspUrl = sys.argv[1] + +# Parse CA and user certificates + +issuerCert, _ = decoder.decode( + pem.readPemFromFile(sys.stdin), + asn1Spec=rfc2459.Certificate() + ) +userCert, _ = decoder.decode( + pem.readPemFromFile(sys.stdin), + asn1Spec=rfc2459.Certificate() + ) + +# Build OCSP request + +ocspReq = mkOcspRequest(issuerCert, userCert) + +# Use HTTP POST to get response (see Appendix A of RFC 2560) +# In case you need proxies, set the http_proxy env variable + +httpReq = urllib2.Request( + ocspUrl, + encoder.encode(ocspReq), + { 'Content-Type': 'application/ocsp-request' } + ) +httpRsp = urllib2.urlopen(httpReq).read() + +# Process OCSP response + +ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse()) + +producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp) + +print 'Certificate ID %s is %s at %s till %s\n' % ( + certId.getComponentByName('serialNumber'), + certStatus, + producedAt, + thisUpdate + ) |