| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| |
| from cryptography import utils |
| from cryptography.exceptions import ( |
| InvalidTag, UnsupportedAlgorithm, _Reasons |
| ) |
| from cryptography.hazmat.primitives import constant_time, interfaces |
| from cryptography.hazmat.primitives.ciphers.modes import ( |
| CFB, CFB8, CTR, OFB |
| ) |
| |
| |
| @utils.register_interface(interfaces.CipherContext) |
| class _CipherContext(object): |
| def __init__(self, backend, cipher, mode, operation): |
| self._backend = backend |
| self._cipher = cipher |
| self._mode = mode |
| self._operation = operation |
| # There is a bug in CommonCrypto where block ciphers do not raise |
| # kCCAlignmentError when finalizing if you supply non-block aligned |
| # data. To work around this we need to keep track of the block |
| # alignment ourselves, but only for alg+mode combos that require |
| # block alignment. OFB, CFB, and CTR make a block cipher algorithm |
| # into a stream cipher so we don't need to track them (and thus their |
| # block size is effectively 1 byte just like OpenSSL/CommonCrypto |
| # treat RC4 and other stream cipher block sizes). |
| # This bug has been filed as rdar://15589470 |
| self._bytes_processed = 0 |
| if (isinstance(cipher, interfaces.BlockCipherAlgorithm) and not |
| isinstance(mode, (OFB, CFB, CFB8, CTR))): |
| self._byte_block_size = cipher.block_size // 8 |
| else: |
| self._byte_block_size = 1 |
| |
| registry = self._backend._cipher_registry |
| try: |
| cipher_enum, mode_enum = registry[type(cipher), type(mode)] |
| except KeyError: |
| raise UnsupportedAlgorithm( |
| "cipher {0} in {1} mode is not supported " |
| "by this backend.".format( |
| cipher.name, mode.name if mode else mode), |
| _Reasons.UNSUPPORTED_CIPHER |
| ) |
| |
| ctx = self._backend._ffi.new("CCCryptorRef *") |
| ctx = self._backend._ffi.gc(ctx, self._backend._release_cipher_ctx) |
| |
| if isinstance(mode, interfaces.ModeWithInitializationVector): |
| iv_nonce = mode.initialization_vector |
| elif isinstance(mode, interfaces.ModeWithNonce): |
| iv_nonce = mode.nonce |
| else: |
| iv_nonce = self._backend._ffi.NULL |
| |
| if isinstance(mode, CTR): |
| mode_option = self._backend._lib.kCCModeOptionCTR_BE |
| else: |
| mode_option = 0 |
| |
| res = self._backend._lib.CCCryptorCreateWithMode( |
| operation, |
| mode_enum, cipher_enum, |
| self._backend._lib.ccNoPadding, iv_nonce, |
| cipher.key, len(cipher.key), |
| self._backend._ffi.NULL, 0, 0, mode_option, ctx) |
| self._backend._check_cipher_response(res) |
| |
| self._ctx = ctx |
| |
| def update(self, data): |
| # Count bytes processed to handle block alignment. |
| self._bytes_processed += len(data) |
| buf = self._backend._ffi.new( |
| "unsigned char[]", len(data) + self._byte_block_size - 1) |
| outlen = self._backend._ffi.new("size_t *") |
| res = self._backend._lib.CCCryptorUpdate( |
| self._ctx[0], data, len(data), buf, |
| len(data) + self._byte_block_size - 1, outlen) |
| self._backend._check_cipher_response(res) |
| return self._backend._ffi.buffer(buf)[:outlen[0]] |
| |
| def finalize(self): |
| # Raise error if block alignment is wrong. |
| if self._bytes_processed % self._byte_block_size: |
| raise ValueError( |
| "The length of the provided data is not a multiple of " |
| "the block length." |
| ) |
| buf = self._backend._ffi.new("unsigned char[]", self._byte_block_size) |
| outlen = self._backend._ffi.new("size_t *") |
| res = self._backend._lib.CCCryptorFinal( |
| self._ctx[0], buf, len(buf), outlen) |
| self._backend._check_cipher_response(res) |
| self._backend._release_cipher_ctx(self._ctx) |
| return self._backend._ffi.buffer(buf)[:outlen[0]] |
| |
| |
| @utils.register_interface(interfaces.AEADCipherContext) |
| @utils.register_interface(interfaces.AEADEncryptionContext) |
| class _GCMCipherContext(object): |
| def __init__(self, backend, cipher, mode, operation): |
| self._backend = backend |
| self._cipher = cipher |
| self._mode = mode |
| self._operation = operation |
| self._tag = None |
| |
| registry = self._backend._cipher_registry |
| try: |
| cipher_enum, mode_enum = registry[type(cipher), type(mode)] |
| except KeyError: |
| raise UnsupportedAlgorithm( |
| "cipher {0} in {1} mode is not supported " |
| "by this backend.".format( |
| cipher.name, mode.name if mode else mode), |
| _Reasons.UNSUPPORTED_CIPHER |
| ) |
| |
| ctx = self._backend._ffi.new("CCCryptorRef *") |
| ctx = self._backend._ffi.gc(ctx, self._backend._release_cipher_ctx) |
| |
| self._ctx = ctx |
| |
| res = self._backend._lib.CCCryptorCreateWithMode( |
| operation, |
| mode_enum, cipher_enum, |
| self._backend._lib.ccNoPadding, |
| self._backend._ffi.NULL, |
| cipher.key, len(cipher.key), |
| self._backend._ffi.NULL, 0, 0, 0, self._ctx) |
| self._backend._check_cipher_response(res) |
| |
| res = self._backend._lib.CCCryptorGCMAddIV( |
| self._ctx[0], |
| mode.initialization_vector, |
| len(mode.initialization_vector) |
| ) |
| self._backend._check_cipher_response(res) |
| # CommonCrypto has a bug where calling update without at least one |
| # call to authenticate_additional_data will result in null byte output |
| # for ciphertext. The following empty byte string call prevents the |
| # issue, which is present in at least 10.8 and 10.9. |
| # Filed as rdar://18314544 |
| self.authenticate_additional_data(b"") |
| |
| def update(self, data): |
| buf = self._backend._ffi.new("unsigned char[]", len(data)) |
| args = (self._ctx[0], data, len(data), buf) |
| if self._operation == self._backend._lib.kCCEncrypt: |
| res = self._backend._lib.CCCryptorGCMEncrypt(*args) |
| else: |
| res = self._backend._lib.CCCryptorGCMDecrypt(*args) |
| |
| self._backend._check_cipher_response(res) |
| return self._backend._ffi.buffer(buf)[:] |
| |
| def finalize(self): |
| # CommonCrypto has a yet another bug where you must make at least one |
| # call to update. If you pass just AAD and call finalize without a call |
| # to update you'll get null bytes for tag. The following update call |
| # prevents this issue, which is present in at least 10.8 and 10.9. |
| # Filed as rdar://18314580 |
| self.update(b"") |
| tag_size = self._cipher.block_size // 8 |
| tag_buf = self._backend._ffi.new("unsigned char[]", tag_size) |
| tag_len = self._backend._ffi.new("size_t *", tag_size) |
| res = self._backend._lib.CCCryptorGCMFinal( |
| self._ctx[0], tag_buf, tag_len |
| ) |
| self._backend._check_cipher_response(res) |
| self._backend._release_cipher_ctx(self._ctx) |
| self._tag = self._backend._ffi.buffer(tag_buf)[:] |
| if (self._operation == self._backend._lib.kCCDecrypt and |
| not constant_time.bytes_eq( |
| self._tag[:len(self._mode.tag)], self._mode.tag |
| )): |
| raise InvalidTag |
| return b"" |
| |
| def authenticate_additional_data(self, data): |
| res = self._backend._lib.CCCryptorGCMAddAAD( |
| self._ctx[0], data, len(data) |
| ) |
| self._backend._check_cipher_response(res) |
| |
| tag = utils.read_only_property("_tag") |