summaryrefslogtreecommitdiff
path: root/src/cryptography/hazmat/backends/openssl/ciphers.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cryptography/hazmat/backends/openssl/ciphers.py')
-rw-r--r--src/cryptography/hazmat/backends/openssl/ciphers.py134
1 files changed, 68 insertions, 66 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/ciphers.py b/src/cryptography/hazmat/backends/openssl/ciphers.py
index 66ac5fd69..ad5dad3f7 100644
--- a/src/cryptography/hazmat/backends/openssl/ciphers.py
+++ b/src/cryptography/hazmat/backends/openssl/ciphers.py
@@ -17,6 +17,7 @@ from cryptography.hazmat.primitives.ciphers import modes
class _CipherContext(object):
_ENCRYPT = 1
_DECRYPT = 0
+ _MAX_CHUNK_SIZE = 2 ** 30 - 1
def __init__(self, backend, cipher, mode, operation):
self._backend = backend
@@ -40,10 +41,11 @@ class _CipherContext(object):
adapter = registry[type(cipher), type(mode)]
except KeyError:
raise UnsupportedAlgorithm(
- "cipher {0} in {1} mode is not supported "
+ "cipher {} in {} mode is not supported "
"by this backend.".format(
- cipher.name, mode.name if mode else mode),
- _Reasons.UNSUPPORTED_CIPHER
+ cipher.name, mode.name if mode else mode
+ ),
+ _Reasons.UNSUPPORTED_CIPHER,
)
evp_cipher = adapter(self._backend, cipher, mode)
@@ -53,7 +55,7 @@ class _CipherContext(object):
msg += "in {0.name} mode ".format(mode)
msg += (
"is not supported by this backend (Your version of OpenSSL "
- "may be too old. Current version: {0}.)"
+ "may be too old. Current version: {}.)"
).format(self._backend.openssl_version_text())
raise UnsupportedAlgorithm(msg, _Reasons.UNSUPPORTED_CIPHER)
@@ -70,11 +72,14 @@ class _CipherContext(object):
else:
iv_nonce = self._backend._ffi.NULL
# begin init with cipher and operation type
- res = self._backend._lib.EVP_CipherInit_ex(ctx, evp_cipher,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- operation)
+ res = self._backend._lib.EVP_CipherInit_ex(
+ ctx,
+ evp_cipher,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ self._backend._ffi.NULL,
+ operation,
+ )
self._backend.openssl_assert(res != 0)
# set the key length to handle variable key ciphers
res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
@@ -83,26 +88,21 @@ class _CipherContext(object):
self._backend.openssl_assert(res != 0)
if isinstance(mode, modes.GCM):
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- ctx, self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
- len(iv_nonce), self._backend._ffi.NULL
+ ctx,
+ self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
+ len(iv_nonce),
+ self._backend._ffi.NULL,
)
self._backend.openssl_assert(res != 0)
if mode.tag is not None:
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
- len(mode.tag), mode.tag
+ ctx,
+ self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
+ len(mode.tag),
+ mode.tag,
)
self._backend.openssl_assert(res != 0)
self._tag = mode.tag
- elif (
- self._operation == self._DECRYPT and
- self._backend._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102 and
- not self._backend._lib.CRYPTOGRAPHY_IS_LIBRESSL
- ):
- raise NotImplementedError(
- "delayed passing of GCM tag requires OpenSSL >= 1.0.2."
- " To use this feature please update OpenSSL"
- )
# pass key/iv
res = self._backend._lib.EVP_CipherInit_ex(
@@ -111,7 +111,7 @@ class _CipherContext(object):
self._backend._ffi.NULL,
self._backend._ffi.from_buffer(cipher.key),
iv_nonce,
- operation
+ operation,
)
self._backend.openssl_assert(res != 0)
# We purposely disable padding here as it's handled higher up in the
@@ -125,36 +125,38 @@ class _CipherContext(object):
return bytes(buf[:n])
def update_into(self, data, buf):
- if len(buf) < (len(data) + self._block_size_bytes - 1):
+ total_data_len = len(data)
+ if len(buf) < (total_data_len + self._block_size_bytes - 1):
raise ValueError(
- "buffer must be at least {0} bytes for this "
+ "buffer must be at least {} bytes for this "
"payload".format(len(data) + self._block_size_bytes - 1)
)
- buf = self._backend._ffi.cast(
- "unsigned char *", self._backend._ffi.from_buffer(buf)
- )
+ data_processed = 0
+ total_out = 0
outlen = self._backend._ffi.new("int *")
- res = self._backend._lib.EVP_CipherUpdate(
- self._ctx, buf, outlen,
- self._backend._ffi.from_buffer(data), len(data)
- )
- self._backend.openssl_assert(res != 0)
- return outlen[0]
+ baseoutbuf = self._backend._ffi.from_buffer(buf)
+ baseinbuf = self._backend._ffi.from_buffer(data)
- def finalize(self):
- # OpenSSL 1.0.1 on Ubuntu 12.04 (and possibly other distributions)
- # appears to have a bug where you must make at least one call to update
- # even if you are only using authenticate_additional_data or the
- # GCM tag will be wrong. An (empty) call to update resolves this
- # and is harmless for all other versions of OpenSSL.
- if isinstance(self._mode, modes.GCM):
- self.update(b"")
+ while data_processed != total_data_len:
+ outbuf = baseoutbuf + total_out
+ inbuf = baseinbuf + data_processed
+ inlen = min(self._MAX_CHUNK_SIZE, total_data_len - data_processed)
+ res = self._backend._lib.EVP_CipherUpdate(
+ self._ctx, outbuf, outlen, inbuf, inlen
+ )
+ self._backend.openssl_assert(res != 0)
+ data_processed += inlen
+ total_out += outlen[0]
+
+ return total_out
+
+ def finalize(self):
if (
- self._operation == self._DECRYPT and
- isinstance(self._mode, modes.ModeWithAuthenticationTag) and
- self.tag is None
+ self._operation == self._DECRYPT
+ and isinstance(self._mode, modes.ModeWithAuthenticationTag)
+ and self.tag is None
):
raise ValueError(
"Authentication tag must be provided when decrypting."
@@ -172,47 +174,44 @@ class _CipherContext(object):
self._backend.openssl_assert(
errors[0]._lib_reason_match(
self._backend._lib.ERR_LIB_EVP,
- self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
- )
+ self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH,
+ ),
+ errors=errors,
)
raise ValueError(
"The length of the provided data is not a multiple of "
"the block length."
)
- if (isinstance(self._mode, modes.GCM) and
- self._operation == self._ENCRYPT):
+ if (
+ isinstance(self._mode, modes.GCM)
+ and self._operation == self._ENCRYPT
+ ):
tag_buf = self._backend._ffi.new(
"unsigned char[]", self._block_size_bytes
)
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- self._ctx, self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
- self._block_size_bytes, tag_buf
+ self._ctx,
+ self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
+ self._block_size_bytes,
+ tag_buf,
)
self._backend.openssl_assert(res != 0)
self._tag = self._backend._ffi.buffer(tag_buf)[:]
res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx)
self._backend.openssl_assert(res == 1)
- return self._backend._ffi.buffer(buf)[:outlen[0]]
+ return self._backend._ffi.buffer(buf)[: outlen[0]]
def finalize_with_tag(self, tag):
- if (
- self._backend._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102 and
- not self._backend._lib.CRYPTOGRAPHY_IS_LIBRESSL
- ):
- raise NotImplementedError(
- "finalize_with_tag requires OpenSSL >= 1.0.2. To use this "
- "method please update OpenSSL"
- )
if len(tag) < self._mode._min_tag_length:
raise ValueError(
- "Authentication tag must be {0} bytes or longer.".format(
- self._mode._min_tag_length)
+ "Authentication tag must be {} bytes or longer.".format(
+ self._mode._min_tag_length
+ )
)
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
- len(tag), tag
+ self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
)
self._backend.openssl_assert(res != 0)
self._tag = tag
@@ -221,8 +220,11 @@ class _CipherContext(object):
def authenticate_additional_data(self, data):
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(
- self._ctx, self._backend._ffi.NULL, outlen,
- self._backend._ffi.from_buffer(data), len(data)
+ self._ctx,
+ self._backend._ffi.NULL,
+ outlen,
+ self._backend._ffi.from_buffer(data),
+ len(data),
)
self._backend.openssl_assert(res != 0)