| # Copyright 2020 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS-IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Tests for tink.tools.testing.python.testing_server.""" |
| |
| from absl.testing import absltest |
| import grpc |
| |
| import tink |
| from tink import aead |
| from tink import daead |
| from tink import hybrid |
| from tink import mac |
| from tink import prf |
| from tink import signature |
| from tink import streaming_aead |
| |
| |
| from protos import testing_api_pb2 |
| import services |
| |
| |
| class DummyServicerContext(grpc.ServicerContext): |
| |
| def is_active(self): |
| pass |
| |
| def time_remaining(self): |
| pass |
| |
| def cancel(self): |
| pass |
| |
| def add_callback(self, callback): |
| pass |
| |
| def invocation_metadata(self): |
| pass |
| |
| def peer(self): |
| pass |
| |
| def peer_identities(self): |
| pass |
| |
| def peer_identity_key(self): |
| pass |
| |
| def auth_context(self): |
| pass |
| |
| def set_compression(self, compression): |
| pass |
| |
| def send_initial_metadata(self, initial_metadata): |
| pass |
| |
| def set_trailing_metadata(self, trailing_metadata): |
| pass |
| |
| def abort(self, code, details): |
| pass |
| |
| def abort_with_status(self, status): |
| pass |
| |
| def set_code(self, code): |
| pass |
| |
| def set_details(self, details): |
| pass |
| |
| def disable_next_message_compression(self): |
| pass |
| |
| |
| class ServicesTest(absltest.TestCase): |
| |
| _ctx = DummyServicerContext() |
| |
| @classmethod |
| def setUpClass(cls): |
| super().setUpClass() |
| aead.register() |
| daead.register() |
| mac.register() |
| hybrid.register() |
| prf.register() |
| signature.register() |
| streaming_aead.register() |
| |
| def test_from_json(self): |
| keyset_servicer = services.KeysetServicer() |
| json_keyset = """ |
| { |
| "primaryKeyId": 42, |
| "key": [ |
| { |
| "keyData": { |
| "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", |
| "keyMaterialType": "SYMMETRIC", |
| "value": "AFakeTestKeyValue1234567" |
| |
| }, |
| "outputPrefixType": "TINK", |
| "keyId": 42, |
| "status": "ENABLED" |
| } |
| ] |
| }""" |
| request = testing_api_pb2.KeysetFromJsonRequest(json_keyset=json_keyset) |
| response = keyset_servicer.FromJson(request, self._ctx) |
| self.assertEqual(response.WhichOneof('result'), 'keyset') |
| keyset = tink.BinaryKeysetReader(response.keyset).read() |
| self.assertEqual(keyset.primary_key_id, 42) |
| self.assertLen(keyset.key, 1) |
| |
| def test_from_json_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| request = testing_api_pb2.KeysetFromJsonRequest(json_keyset='bad json') |
| response = keyset_servicer.FromJson(request, self._ctx) |
| self.assertEqual(response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(response.err) |
| |
| def test_generate_to_from_json(self): |
| keyset_servicer = services.KeysetServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| tojson_request = testing_api_pb2.KeysetToJsonRequest(keyset=keyset) |
| tojson_response = keyset_servicer.ToJson(tojson_request, self._ctx) |
| self.assertEqual(tojson_response.WhichOneof('result'), 'json_keyset') |
| json_keyset = tojson_response.json_keyset |
| |
| fromjson_request = testing_api_pb2.KeysetFromJsonRequest( |
| json_keyset=json_keyset) |
| fromjson_response = keyset_servicer.FromJson(fromjson_request, self._ctx) |
| self.assertEqual(fromjson_response.WhichOneof('result'), 'keyset') |
| self.assertEqual(fromjson_response.keyset, keyset) |
| |
| def test_to_json_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| request = testing_api_pb2.KeysetToJsonRequest(keyset=b'bad keyset') |
| response = keyset_servicer.ToJson(request, self._ctx) |
| self.assertEqual(response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(response.err) |
| |
| def test_generate_keyset_write_read_encrypted(self): |
| keyset_servicer = services.KeysetServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| master_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(master_response.WhichOneof('result'), 'keyset') |
| master_keyset = master_response.keyset |
| |
| keyset_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(keyset_response.WhichOneof('result'), 'keyset') |
| keyset = keyset_response.keyset |
| |
| write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( |
| keyset=keyset, |
| master_keyset=master_keyset, |
| keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) |
| write_encrypted_response = keyset_servicer.WriteEncrypted( |
| write_encrypted_request, self._ctx) |
| self.assertEqual( |
| write_encrypted_response.WhichOneof('result'), 'encrypted_keyset') |
| encrypted_keyset = write_encrypted_response.encrypted_keyset |
| |
| read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( |
| encrypted_keyset=encrypted_keyset, |
| master_keyset=master_keyset, |
| keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) |
| read_encrypted_response = keyset_servicer.ReadEncrypted( |
| read_encrypted_request, self._ctx) |
| self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset') |
| self.assertEqual(read_encrypted_response.keyset, keyset) |
| |
| def test_generate_keyset_write_read_encrypted_with_associated_data(self): |
| keyset_servicer = services.KeysetServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| master_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(master_response.WhichOneof('result'), 'keyset') |
| master_keyset = master_response.keyset |
| |
| keyset_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(keyset_response.WhichOneof('result'), 'keyset') |
| keyset = keyset_response.keyset |
| |
| associated_data = b'associated_data' |
| |
| write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( |
| keyset=keyset, |
| master_keyset=master_keyset, |
| associated_data=testing_api_pb2.BytesValue(value=associated_data), |
| keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) |
| write_encrypted_response = keyset_servicer.WriteEncrypted( |
| write_encrypted_request, self._ctx) |
| self.assertEqual( |
| write_encrypted_response.WhichOneof('result'), 'encrypted_keyset') |
| encrypted_keyset = write_encrypted_response.encrypted_keyset |
| |
| read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( |
| encrypted_keyset=encrypted_keyset, |
| master_keyset=master_keyset, |
| associated_data=testing_api_pb2.BytesValue(value=associated_data), |
| keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) |
| read_encrypted_response = keyset_servicer.ReadEncrypted( |
| read_encrypted_request, self._ctx) |
| self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset') |
| self.assertEqual(read_encrypted_response.keyset, keyset) |
| |
| # Using the wrong associated_data fails |
| read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( |
| encrypted_keyset=encrypted_keyset, |
| master_keyset=master_keyset, |
| associated_data=testing_api_pb2.BytesValue(value=b'wrong ad'), |
| keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) |
| read_encrypted_response = keyset_servicer.ReadEncrypted( |
| read_encrypted_request, self._ctx) |
| self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err') |
| |
| def test_keyset_write_encrypted_fails_when_keyset_is_invalid(self): |
| keyset_servicer = services.KeysetServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| master_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(master_response.WhichOneof('result'), 'keyset') |
| master_keyset = master_response.keyset |
| |
| write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( |
| keyset=b'invalid', |
| master_keyset=master_keyset, |
| keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) |
| write_encrypted_response = keyset_servicer.WriteEncrypted( |
| write_encrypted_request, self._ctx) |
| self.assertEqual(write_encrypted_response.WhichOneof('result'), 'err') |
| |
| def test_keyset_read_encrypted_fails_when_encrypted_keyset_is_invalid(self): |
| keyset_servicer = services.KeysetServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| master_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(master_response.WhichOneof('result'), 'keyset') |
| master_keyset = master_response.keyset |
| |
| read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( |
| encrypted_keyset=b'invalid', |
| master_keyset=master_keyset, |
| keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) |
| read_encrypted_response = keyset_servicer.ReadEncrypted( |
| read_encrypted_request, self._ctx) |
| self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err') |
| |
| def test_create_aead(self): |
| keyset_servicer = services.KeysetServicer() |
| aead_servicer = services.AeadServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = aead_servicer.Create(creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_aead_broken_keyset(self): |
| aead_servicer = services.AeadServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = aead_servicer.Create(creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_encrypt_decrypt_wrong_keyset(self): |
| aead_servicer = services.AeadServicer() |
| keyset_servicer = services.KeysetServicer() |
| # HMAC keysets will not allow creation of an AEAD. |
| template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| with self.assertRaises(tink.TinkError): |
| aead_servicer.Encrypt( |
| testing_api_pb2.AeadEncryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset)), self._ctx) |
| |
| with self.assertRaises(tink.TinkError): |
| aead_servicer.Decrypt( |
| testing_api_pb2.AeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset)), self._ctx) |
| |
| def test_generate_encrypt_decrypt(self): |
| keyset_servicer = services.KeysetServicer() |
| aead_servicer = services.AeadServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| plaintext = b'The quick brown fox jumps over the lazy dog' |
| associated_data = b'associated_data' |
| enc_request = testing_api_pb2.AeadEncryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| plaintext=plaintext, |
| associated_data=associated_data) |
| enc_response = aead_servicer.Encrypt(enc_request, self._ctx) |
| self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') |
| ciphertext = enc_response.ciphertext |
| dec_request = testing_api_pb2.AeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| ciphertext=ciphertext, |
| associated_data=associated_data) |
| dec_response = aead_servicer.Decrypt(dec_request, self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') |
| self.assertEqual(dec_response.plaintext, plaintext) |
| |
| def test_generate_decrypt_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| aead_servicer = services.AeadServicer() |
| |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| ciphertext = b'some invalid ciphertext' |
| associated_data = b'associated_data' |
| dec_request = testing_api_pb2.AeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| ciphertext=ciphertext, |
| associated_data=associated_data) |
| dec_response = aead_servicer.Decrypt(dec_request, self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(dec_response.err) |
| |
| def test_server_info(self): |
| metadata_servicer = services.MetadataServicer() |
| request = testing_api_pb2.ServerInfoRequest() |
| response = metadata_servicer.GetServerInfo(request, self._ctx) |
| self.assertEqual(response.language, 'python') |
| |
| def test_create_deterministic_aead(self): |
| keyset_servicer = services.KeysetServicer() |
| daead_servicer = services.DeterministicAeadServicer() |
| |
| template_proto = daead.deterministic_aead_key_templates.AES256_SIV |
| template = template_proto.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = daead_servicer.Create( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_deterministic_aead_broken_keyset(self): |
| daead_servicer = services.DeterministicAeadServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = daead_servicer.Create(creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_encrypt_decrypt_deterministic_aead_broken_keyset(self): |
| keyset_servicer = services.KeysetServicer() |
| daead_servicer = services.DeterministicAeadServicer() |
| |
| # AES128_GCM keysets will not allow creation of an Deterministic AEAD. |
| template = aead.aead_key_templates.AES128_GCM.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| enc_request = testing_api_pb2.DeterministicAeadEncryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset)) |
| with self.assertRaises(tink.TinkError): |
| daead_servicer.EncryptDeterministically(enc_request, self._ctx) |
| dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset)) |
| with self.assertRaises(tink.TinkError): |
| daead_servicer.DecryptDeterministically(dec_request, self._ctx) |
| |
| def test_generate_encrypt_decrypt_deterministically(self): |
| keyset_servicer = services.KeysetServicer() |
| daead_servicer = services.DeterministicAeadServicer() |
| |
| template_proto = daead.deterministic_aead_key_templates.AES256_SIV |
| template = template_proto.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| plaintext = b'The quick brown fox jumps over the lazy dog' |
| associated_data = b'associated_data' |
| enc_request = testing_api_pb2.DeterministicAeadEncryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| plaintext=plaintext, |
| associated_data=associated_data) |
| enc_response = daead_servicer.EncryptDeterministically(enc_request, |
| self._ctx) |
| self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') |
| enc_response2 = daead_servicer.EncryptDeterministically(enc_request, |
| self._ctx) |
| self.assertEqual(enc_response2.WhichOneof('result'), 'ciphertext') |
| self.assertEqual(enc_response2.ciphertext, enc_response.ciphertext) |
| ciphertext = enc_response.ciphertext |
| dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| ciphertext=ciphertext, |
| associated_data=associated_data) |
| dec_response = daead_servicer.DecryptDeterministically(dec_request, |
| self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') |
| self.assertEqual(dec_response.plaintext, plaintext) |
| |
| def test_generate_decrypt_deterministically_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| daead_servicer = services.DeterministicAeadServicer() |
| |
| template_proto = daead.deterministic_aead_key_templates.AES256_SIV |
| template = template_proto.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| ciphertext = b'some invalid ciphertext' |
| associated_data = b'associated_data' |
| dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| ciphertext=ciphertext, |
| associated_data=associated_data) |
| dec_response = daead_servicer.DecryptDeterministically(dec_request, |
| self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(dec_response.err) |
| |
| def test_create_mac(self): |
| keyset_servicer = services.KeysetServicer() |
| mac_servicer = services.MacServicer() |
| |
| template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = mac_servicer.Create( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_mac_broken_keyset(self): |
| mac_servicer = services.MacServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = mac_servicer.Create(creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_generate_compute_verify_mac(self): |
| keyset_servicer = services.KeysetServicer() |
| mac_servicer = services.MacServicer() |
| |
| template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| data = b'The quick brown fox jumps over the lazy dog' |
| comp_request = testing_api_pb2.ComputeMacRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| data=data) |
| comp_response = mac_servicer.ComputeMac(comp_request, self._ctx) |
| self.assertEqual(comp_response.WhichOneof('result'), 'mac_value') |
| mac_value = comp_response.mac_value |
| verify_request = testing_api_pb2.VerifyMacRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| mac_value=mac_value, |
| data=data) |
| verify_response = mac_servicer.VerifyMac(verify_request, self._ctx) |
| self.assertEmpty(verify_response.err) |
| |
| def test_generate_compute_verify_mac_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| mac_servicer = services.MacServicer() |
| |
| template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| verify_request = testing_api_pb2.VerifyMacRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| mac_value=b'invalid mac_value', |
| data=b'data') |
| verify_response = mac_servicer.VerifyMac(verify_request, self._ctx) |
| self.assertNotEmpty(verify_response.err) |
| |
| def test_create_hybrid_decrypt(self): |
| keyset_servicer = services.KeysetServicer() |
| hybrid_servicer = services.HybridServicer() |
| |
| tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM |
| template = tp.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = hybrid_servicer.CreateHybridDecrypt( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_hybrid_decrypt_bad_keyset(self): |
| hybrid_servicer = services.HybridServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = hybrid_servicer.CreateHybridDecrypt( |
| creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_create_hybrid_encrypt(self): |
| keyset_servicer = services.KeysetServicer() |
| hybrid_servicer = services.HybridServicer() |
| |
| tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM |
| template = tp.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| pub_request = testing_api_pb2.KeysetPublicRequest( |
| private_keyset=gen_response.keyset) |
| pub_response = keyset_servicer.Public(pub_request, self._ctx) |
| self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=pub_response.public_keyset)) |
| creation_response = hybrid_servicer.CreateHybridEncrypt( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_hybrid_encrypt_bad_keyset(self): |
| hybrid_servicer = services.HybridServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = hybrid_servicer.CreateHybridEncrypt( |
| creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_generate_hybrid_encrypt_decrypt(self): |
| keyset_servicer = services.KeysetServicer() |
| hybrid_servicer = services.HybridServicer() |
| |
| tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM |
| template = tp.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEmpty(gen_response.err) |
| private_keyset = gen_response.keyset |
| |
| pub_request = testing_api_pb2.KeysetPublicRequest( |
| private_keyset=private_keyset) |
| pub_response = keyset_servicer.Public(pub_request, self._ctx) |
| self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') |
| public_keyset = pub_response.public_keyset |
| |
| plaintext = b'The quick brown fox jumps over the lazy dog' |
| context_info = b'context_info' |
| enc_request = testing_api_pb2.HybridEncryptRequest( |
| public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=public_keyset), |
| plaintext=plaintext, |
| context_info=context_info) |
| enc_response = hybrid_servicer.Encrypt(enc_request, self._ctx) |
| self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') |
| ciphertext = enc_response.ciphertext |
| |
| dec_request = testing_api_pb2.HybridDecryptRequest( |
| private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=private_keyset), |
| ciphertext=ciphertext, |
| context_info=context_info) |
| dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') |
| self.assertEqual(dec_response.plaintext, plaintext) |
| |
| def test_generate_hybrid_encrypt_decrypt_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| hybrid_servicer = services.HybridServicer() |
| |
| tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM |
| template = tp.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| private_keyset = gen_response.keyset |
| |
| dec_request = testing_api_pb2.HybridDecryptRequest( |
| private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=private_keyset), |
| ciphertext=b'invalid ciphertext', |
| context_info=b'context_info') |
| dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(dec_response.err) |
| |
| def test_create_public_key_sign(self): |
| keyset_servicer = services.KeysetServicer() |
| signature_servicer = services.SignatureServicer() |
| |
| template = signature.signature_key_templates.ECDSA_P256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = signature_servicer.CreatePublicKeySign( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_public_key_sign_bad_keyset(self): |
| signature_servicer = services.SignatureServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = signature_servicer.CreatePublicKeySign( |
| creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_create_public_key_verify(self): |
| keyset_servicer = services.KeysetServicer() |
| signature_servicer = services.SignatureServicer() |
| |
| template = signature.signature_key_templates.ECDSA_P256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| pub_request = testing_api_pb2.KeysetPublicRequest( |
| private_keyset=gen_response.keyset) |
| pub_response = keyset_servicer.Public(pub_request, self._ctx) |
| self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=pub_response.public_keyset)) |
| creation_response = signature_servicer.CreatePublicKeyVerify( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_public_key_verify_bad_keyset(self): |
| signature_servicer = services.SignatureServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = signature_servicer.CreatePublicKeyVerify( |
| creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_sign_verify(self): |
| keyset_servicer = services.KeysetServicer() |
| signature_servicer = services.SignatureServicer() |
| |
| template = signature.signature_key_templates.ECDSA_P256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| private_keyset = gen_response.keyset |
| |
| pub_request = testing_api_pb2.KeysetPublicRequest( |
| private_keyset=private_keyset) |
| pub_response = keyset_servicer.Public(pub_request, self._ctx) |
| self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') |
| public_keyset = pub_response.public_keyset |
| |
| data = b'The quick brown fox jumps over the lazy dog' |
| |
| sign_request = testing_api_pb2.SignatureSignRequest( |
| private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=private_keyset), |
| data=data) |
| sign_response = signature_servicer.Sign(sign_request, self._ctx) |
| self.assertEqual(sign_response.WhichOneof('result'), 'signature') |
| a_signature = sign_response.signature |
| |
| verify_request = testing_api_pb2.SignatureVerifyRequest( |
| public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=public_keyset), |
| signature=a_signature, |
| data=data) |
| verify_response = signature_servicer.Verify(verify_request, self._ctx) |
| self.assertEmpty(verify_response.err) |
| |
| def test_sign_verify_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| signature_servicer = services.SignatureServicer() |
| |
| template = signature.signature_key_templates.ECDSA_P256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| self.assertEmpty(gen_response.err) |
| private_keyset = gen_response.keyset |
| |
| pub_request = testing_api_pb2.KeysetPublicRequest( |
| private_keyset=private_keyset) |
| pub_response = keyset_servicer.Public(pub_request, self._ctx) |
| self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') |
| public_keyset = pub_response.public_keyset |
| |
| invalid_request = testing_api_pb2.SignatureVerifyRequest( |
| public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=public_keyset), |
| signature=b'invalid signature', |
| data=b'The quick brown fox jumps over the lazy dog') |
| invalid_response = signature_servicer.Verify(invalid_request, self._ctx) |
| self.assertNotEmpty(invalid_response.err) |
| |
| def test_create_prf_set(self): |
| keyset_servicer = services.KeysetServicer() |
| prf_set_servicer = services.PrfSetServicer() |
| |
| template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = prf_set_servicer.Create(creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_prf_set_wrong_keyset(self): |
| prf_set_servicer = services.PrfSetServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = prf_set_servicer.Create(creation_request, self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_compute_prf(self): |
| keyset_servicer = services.KeysetServicer() |
| prf_set_servicer = services.PrfSetServicer() |
| template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset)) |
| key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx) |
| self.assertEqual(key_ids_response.WhichOneof('result'), 'output') |
| self.assertLen(key_ids_response.output.key_id, 1) |
| self.assertEqual(key_ids_response.output.key_id[0], |
| key_ids_response.output.primary_key_id) |
| |
| output_length = 31 |
| compute_request = testing_api_pb2.PrfSetComputeRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| key_id=key_ids_response.output.primary_key_id, |
| input_data=b'input_data', |
| output_length=output_length) |
| compute_response = prf_set_servicer.Compute(compute_request, self._ctx) |
| self.assertEqual(compute_response.WhichOneof('result'), 'output') |
| self.assertLen(compute_response.output, output_length) |
| |
| def test_key_ids_prf_fail(self): |
| prf_set_servicer = services.PrfSetServicer() |
| invalid_key_ids_response = prf_set_servicer.KeyIds( |
| testing_api_pb2.PrfSetKeyIdsRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'badkeyset')), self._ctx) |
| self.assertNotEmpty(invalid_key_ids_response.err) |
| |
| def test_compute_prf_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| prf_set_servicer = services.PrfSetServicer() |
| template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset)) |
| key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx) |
| self.assertEqual(key_ids_response.WhichOneof('result'), 'output') |
| primary_key_id = key_ids_response.output.primary_key_id |
| |
| invalid_output_length = 123456 |
| invalid_compute_request = testing_api_pb2.PrfSetComputeRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| key_id=primary_key_id, |
| input_data=b'input_data', |
| output_length=invalid_output_length) |
| invalid_compute_response = prf_set_servicer.Compute(invalid_compute_request, |
| self._ctx) |
| self.assertEqual(invalid_compute_response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(invalid_compute_response.err) |
| |
| def test_create_streaming_aead(self): |
| keyset_servicer = services.KeysetServicer() |
| streaming_aead_servicer = services.StreamingAeadServicer() |
| |
| templates = streaming_aead.streaming_aead_key_templates |
| template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=gen_response.keyset)) |
| creation_response = streaming_aead_servicer.Create( |
| creation_request, self._ctx) |
| self.assertEmpty(creation_response.err) |
| |
| def test_create_streaming_aead_broken_keyset(self): |
| streaming_aead_servicer = services.StreamingAeadServicer() |
| |
| creation_request = testing_api_pb2.CreationRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=b'\x80')) |
| creation_response = streaming_aead_servicer.Create(creation_request, |
| self._ctx) |
| self.assertNotEmpty(creation_response.err) |
| |
| def test_generate_streaming_encrypt_decrypt(self): |
| keyset_servicer = services.KeysetServicer() |
| streaming_aead_servicer = services.StreamingAeadServicer() |
| |
| templates = streaming_aead.streaming_aead_key_templates |
| template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| plaintext = b'The quick brown fox jumps over the lazy dog' |
| associated_data = b'associated_data' |
| |
| enc_request = testing_api_pb2.StreamingAeadEncryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| plaintext=plaintext, |
| associated_data=associated_data) |
| enc_response = streaming_aead_servicer.Encrypt(enc_request, self._ctx) |
| self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') |
| ciphertext = enc_response.ciphertext |
| |
| dec_request = testing_api_pb2.StreamingAeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| ciphertext=ciphertext, |
| associated_data=associated_data) |
| dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') |
| |
| self.assertEqual(dec_response.plaintext, plaintext) |
| |
| def test_generate_streaming_decrypt_fail(self): |
| keyset_servicer = services.KeysetServicer() |
| streaming_aead_servicer = services.StreamingAeadServicer() |
| |
| templates = streaming_aead.streaming_aead_key_templates |
| template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() |
| gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) |
| gen_response = keyset_servicer.Generate(gen_request, self._ctx) |
| self.assertEqual(gen_response.WhichOneof('result'), 'keyset') |
| keyset = gen_response.keyset |
| |
| ciphertext = b'some invalid ciphertext' |
| associated_data = b'associated_data' |
| dec_request = testing_api_pb2.StreamingAeadDecryptRequest( |
| annotated_keyset=testing_api_pb2.AnnotatedKeyset( |
| serialized_keyset=keyset), |
| ciphertext=ciphertext, |
| associated_data=associated_data) |
| dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx) |
| self.assertEqual(dec_response.WhichOneof('result'), 'err') |
| self.assertNotEmpty(dec_response.err) |
| |
| |
| if __name__ == '__main__': |
| absltest.main() |