| # Copyright 2020 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS-IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Testing service API implementations in Python.""" |
| |
| import io |
| |
| import grpc |
| import tink |
| from tink import aead |
| from tink import cleartext_keyset_handle |
| from tink import daead |
| from tink import hybrid |
| from tink import jwt |
| from tink import mac |
| from tink import prf |
| from tink import signature |
| from tink import streaming_aead |
| from tink.proto import tink_pb2 |
| from tink.testing import bytes_io |
| from protos import testing_api_pb2 |
| from protos import testing_api_pb2_grpc |
| |
| |
| # All KeyTemplate (as Protobuf) defined in the Python API. |
| _KEY_TEMPLATE = { |
| 'AES128_EAX': |
| aead.aead_key_templates.AES128_EAX, |
| 'AES128_EAX_RAW': |
| aead.aead_key_templates.AES128_EAX_RAW, |
| 'AES256_EAX': |
| aead.aead_key_templates.AES256_EAX, |
| 'AES256_EAX_RAW': |
| aead.aead_key_templates.AES256_EAX_RAW, |
| 'AES128_GCM': |
| aead.aead_key_templates.AES128_GCM, |
| 'AES128_GCM_RAW': |
| aead.aead_key_templates.AES128_GCM_RAW, |
| 'AES256_GCM': |
| aead.aead_key_templates.AES256_GCM, |
| 'AES256_GCM_RAW': |
| aead.aead_key_templates.AES256_GCM_RAW, |
| 'AES128_GCM_SIV': |
| aead.aead_key_templates.AES128_GCM_SIV, |
| 'AES128_GCM_SIV_RAW': |
| aead.aead_key_templates.AES128_GCM_SIV_RAW, |
| 'AES256_GCM_SIV': |
| aead.aead_key_templates.AES256_GCM_SIV, |
| 'AES256_GCM_SIV_RAW': |
| aead.aead_key_templates.AES256_GCM_SIV_RAW, |
| 'AES128_CTR_HMAC_SHA256': |
| aead.aead_key_templates.AES128_CTR_HMAC_SHA256, |
| 'AES128_CTR_HMAC_SHA256_RAW': |
| aead.aead_key_templates.AES128_CTR_HMAC_SHA256_RAW, |
| 'AES256_CTR_HMAC_SHA256': |
| aead.aead_key_templates.AES256_CTR_HMAC_SHA256, |
| 'AES256_CTR_HMAC_SHA256_RAW': |
| aead.aead_key_templates.AES256_CTR_HMAC_SHA256_RAW, |
| 'XCHACHA20_POLY1305': |
| aead.aead_key_templates.XCHACHA20_POLY1305, |
| 'XCHACHA20_POLY1305_RAW': |
| aead.aead_key_templates.XCHACHA20_POLY1305_RAW, |
| 'AES256_SIV': |
| daead.deterministic_aead_key_templates.AES256_SIV, |
| 'AES128_CTR_HMAC_SHA256_4KB': |
| streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB, |
| 'AES128_CTR_HMAC_SHA256_1MB': |
| streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_1MB, |
| 'AES256_CTR_HMAC_SHA256_4KB': |
| streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_4KB, |
| 'AES256_CTR_HMAC_SHA256_1MB': |
| streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_1MB, |
| 'AES128_GCM_HKDF_4KB': |
| streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_4KB, |
| 'AES128_GCM_HKDF_1MB': |
| streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_1MB, |
| 'AES256_GCM_HKDF_4KB': |
| streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_4KB, |
| 'AES256_GCM_HKDF_1MB': |
| streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_1MB, |
| 'ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM': |
| hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM, |
| 'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM': |
| hybrid.hybrid_key_templates |
| .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM, |
| 'ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256': |
| hybrid.hybrid_key_templates |
| .ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256, |
| 'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256': |
| hybrid.hybrid_key_templates |
| .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256, |
| 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM': |
| hybrid.hybrid_key_templates |
| .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM, |
| 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW': |
| hybrid.hybrid_key_templates |
| .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW, |
| 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM': |
| hybrid.hybrid_key_templates |
| .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM, |
| 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW': |
| hybrid.hybrid_key_templates |
| .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW, |
| 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305': |
| hybrid.hybrid_key_templates |
| .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305, |
| 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW': |
| hybrid.hybrid_key_templates |
| .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW, |
| 'AES_CMAC': |
| mac.mac_key_templates.AES_CMAC, |
| 'HMAC_SHA256_128BITTAG': |
| mac.mac_key_templates.HMAC_SHA256_128BITTAG, |
| 'HMAC_SHA256_256BITTAG': |
| mac.mac_key_templates.HMAC_SHA256_256BITTAG, |
| 'HMAC_SHA512_256BITTAG': |
| mac.mac_key_templates.HMAC_SHA512_256BITTAG, |
| 'HMAC_SHA512_512BITTAG': |
| mac.mac_key_templates.HMAC_SHA512_512BITTAG, |
| 'ECDSA_P256': |
| signature.signature_key_templates.ECDSA_P256, |
| 'ECDSA_P256_RAW': |
| signature.signature_key_templates.ECDSA_P256_RAW, |
| 'ECDSA_P384': |
| signature.signature_key_templates.ECDSA_P384, |
| 'ECDSA_P384_SHA384': |
| signature.signature_key_templates.ECDSA_P384_SHA384, |
| 'ECDSA_P384_SHA512': |
| signature.signature_key_templates.ECDSA_P384_SHA512, |
| 'ECDSA_P521': |
| signature.signature_key_templates.ECDSA_P521, |
| 'ECDSA_P256_IEEE_P1363': |
| signature.signature_key_templates.ECDSA_P256_IEEE_P1363, |
| 'ECDSA_P384_IEEE_P1363': |
| signature.signature_key_templates.ECDSA_P384_IEEE_P1363, |
| 'ECDSA_P384_SHA384_IEEE_P1363': |
| signature.signature_key_templates.ECDSA_P384_SHA384_IEEE_P1363, |
| 'ECDSA_P521_IEEE_P1363': |
| signature.signature_key_templates.ECDSA_P521_IEEE_P1363, |
| 'ED25519': |
| signature.signature_key_templates.ED25519, |
| 'RSA_SSA_PKCS1_3072_SHA256_F4': |
| signature.signature_key_templates.RSA_SSA_PKCS1_3072_SHA256_F4, |
| 'RSA_SSA_PKCS1_4096_SHA512_F4': |
| signature.signature_key_templates.RSA_SSA_PKCS1_4096_SHA512_F4, |
| 'RSA_SSA_PSS_3072_SHA256_SHA256_32_F4': |
| signature.signature_key_templates.RSA_SSA_PSS_3072_SHA256_SHA256_32_F4, |
| 'RSA_SSA_PSS_4096_SHA512_SHA512_64_F4': |
| signature.signature_key_templates.RSA_SSA_PSS_4096_SHA512_SHA512_64_F4, |
| 'AES_CMAC_PRF': |
| prf.prf_key_templates.AES_CMAC, |
| 'HMAC_SHA256_PRF': |
| prf.prf_key_templates.HMAC_SHA256, |
| 'HMAC_SHA512_PRF': |
| prf.prf_key_templates.HMAC_SHA512, |
| 'HKDF_SHA256': |
| prf.prf_key_templates.HKDF_SHA256, |
| 'JWT_HS256': |
| jwt.jwt_hs256_template(), |
| 'JWT_HS256_RAW': |
| jwt.raw_jwt_hs256_template(), |
| 'JWT_HS384': |
| jwt.jwt_hs384_template(), |
| 'JWT_HS384_RAW': |
| jwt.raw_jwt_hs384_template(), |
| 'JWT_HS512': |
| jwt.jwt_hs512_template(), |
| 'JWT_HS512_RAW': |
| jwt.raw_jwt_hs512_template(), |
| 'JWT_ES256': |
| jwt.jwt_es256_template(), |
| 'JWT_ES256_RAW': |
| jwt.raw_jwt_es256_template(), |
| 'JWT_ES384': |
| jwt.jwt_es384_template(), |
| 'JWT_ES384_RAW': |
| jwt.raw_jwt_es384_template(), |
| 'JWT_ES512': |
| jwt.jwt_es512_template(), |
| 'JWT_ES512_RAW': |
| jwt.raw_jwt_es512_template(), |
| 'JWT_RS256_2048_F4': |
| jwt.jwt_rs256_2048_f4_template(), |
| 'JWT_RS256_2048_F4_RAW': |
| jwt.raw_jwt_rs256_2048_f4_template(), |
| 'JWT_RS256_3072_F4': |
| jwt.jwt_rs256_3072_f4_template(), |
| 'JWT_RS256_3072_F4_RAW': |
| jwt.raw_jwt_rs256_3072_f4_template(), |
| 'JWT_RS384_3072_F4': |
| jwt.jwt_rs384_3072_f4_template(), |
| 'JWT_RS384_3072_F4_RAW': |
| jwt.raw_jwt_rs384_3072_f4_template(), |
| 'JWT_RS512_4096_F4': |
| jwt.jwt_rs512_4096_f4_template(), |
| 'JWT_RS512_4096_F4_RAW': |
| jwt.raw_jwt_rs512_4096_f4_template(), |
| 'JWT_PS256_2048_F4': |
| jwt.jwt_ps256_2048_f4_template(), |
| 'JWT_PS256_2048_F4_RAW': |
| jwt.raw_jwt_ps256_2048_f4_template(), |
| 'JWT_PS256_3072_F4': |
| jwt.jwt_ps256_3072_f4_template(), |
| 'JWT_PS256_3072_F4_RAW': |
| jwt.raw_jwt_ps256_3072_f4_template(), |
| 'JWT_PS384_3072_F4': |
| jwt.jwt_ps384_3072_f4_template(), |
| 'JWT_PS384_3072_F4_RAW': |
| jwt.raw_jwt_ps384_3072_f4_template(), |
| 'JWT_PS512_4096_F4': |
| jwt.jwt_ps512_4096_f4_template(), |
| 'JWT_PS512_4096_F4_RAW': |
| jwt.raw_jwt_ps512_4096_f4_template(), |
| } |
| |
| |
| class MetadataServicer(testing_api_pb2_grpc.MetadataServicer): |
| """A service with metadata about the server.""" |
| |
| def GetServerInfo( |
| self, request: testing_api_pb2.ServerInfoRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.ServerInfoResponse: |
| """Returns information about the server.""" |
| return testing_api_pb2.ServerInfoResponse(language='python') |
| |
| |
| class KeysetServicer(testing_api_pb2_grpc.KeysetServicer): |
| """A service for testing Keyset operations.""" |
| |
| def GetTemplate( |
| self, request: testing_api_pb2.KeysetTemplateRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.KeysetTemplateResponse: |
| """Returns the key template for the given template name.""" |
| if request.template_name not in _KEY_TEMPLATE: |
| return testing_api_pb2.KeysetTemplateResponse( |
| err='template %s not found' % request.template_name) |
| return testing_api_pb2.KeysetTemplateResponse( |
| key_template=_KEY_TEMPLATE[request.template_name].SerializeToString()) |
| |
| def Generate( |
| self, request: testing_api_pb2.KeysetGenerateRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.KeysetGenerateResponse: |
| """Generates a keyset.""" |
| try: |
| template = tink_pb2.KeyTemplate() |
| template.ParseFromString(request.template) |
| keyset_handle = tink.new_keyset_handle(template) |
| keyset = io.BytesIO() |
| cleartext_keyset_handle.write( |
| tink.BinaryKeysetWriter(keyset), keyset_handle) |
| return testing_api_pb2.KeysetGenerateResponse(keyset=keyset.getvalue()) |
| except tink.TinkError as e: |
| return testing_api_pb2.KeysetGenerateResponse(err=str(e)) |
| |
| def Public( |
| self, request: testing_api_pb2.KeysetPublicRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.KeysetPublicResponse: |
| """Generates a public-key keyset from a private-key keyset.""" |
| try: |
| private_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.private_keyset)) |
| public_keyset_handle = private_keyset_handle.public_keyset_handle() |
| public_keyset = io.BytesIO() |
| cleartext_keyset_handle.write( |
| tink.BinaryKeysetWriter(public_keyset), public_keyset_handle) |
| return testing_api_pb2.KeysetPublicResponse( |
| public_keyset=public_keyset.getvalue()) |
| except tink.TinkError as e: |
| return testing_api_pb2.KeysetPublicResponse(err=str(e)) |
| |
| def ToJson( |
| self, request: testing_api_pb2.KeysetToJsonRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.KeysetToJsonResponse: |
| """Converts a keyset from binary to JSON format.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.keyset)) |
| json_keyset = io.StringIO() |
| cleartext_keyset_handle.write( |
| tink.JsonKeysetWriter(json_keyset), keyset_handle) |
| return testing_api_pb2.KeysetToJsonResponse( |
| json_keyset=json_keyset.getvalue()) |
| except tink.TinkError as e: |
| return testing_api_pb2.KeysetToJsonResponse(err=str(e)) |
| |
| def FromJson( |
| self, request: testing_api_pb2.KeysetFromJsonRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.KeysetFromJsonResponse: |
| """Converts a keyset from JSON to binary format.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.JsonKeysetReader(request.json_keyset)) |
| keyset = io.BytesIO() |
| cleartext_keyset_handle.write( |
| tink.BinaryKeysetWriter(keyset), keyset_handle) |
| return testing_api_pb2.KeysetFromJsonResponse(keyset=keyset.getvalue()) |
| except tink.TinkError as e: |
| return testing_api_pb2.KeysetFromJsonResponse(err=str(e)) |
| |
| def ReadEncrypted( |
| self, request: testing_api_pb2.KeysetReadEncryptedRequest, |
| context: grpc.ServicerContext |
| ) -> testing_api_pb2.KeysetReadEncryptedResponse: |
| """Reads an encrypted keyset.""" |
| try: |
| master_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.master_keyset)) |
| master_aead = master_keyset_handle.primitive(aead.Aead) |
| |
| if request.keyset_reader_type == testing_api_pb2.KEYSET_READER_BINARY: |
| reader = tink.BinaryKeysetReader(request.encrypted_keyset) |
| elif request.keyset_reader_type == testing_api_pb2.KEYSET_READER_JSON: |
| reader = tink.JsonKeysetReader(request.encrypted_keyset.decode('utf8')) |
| else: |
| raise ValueError('unknown keyset reader type') |
| if request.HasField('associated_data'): |
| keyset_handle = tink.read_keyset_handle_with_associated_data( |
| reader, master_aead, request.associated_data.value) |
| else: |
| keyset_handle = tink.read_keyset_handle(reader, master_aead) |
| |
| keyset = io.BytesIO() |
| cleartext_keyset_handle.write( |
| tink.BinaryKeysetWriter(keyset), keyset_handle) |
| return testing_api_pb2.KeysetReadEncryptedResponse( |
| keyset=keyset.getvalue()) |
| except tink.TinkError as e: |
| return testing_api_pb2.KeysetReadEncryptedResponse(err=str(e)) |
| |
| def WriteEncrypted( |
| self, request: testing_api_pb2.KeysetWriteEncryptedRequest, |
| context: grpc.ServicerContext |
| ) -> testing_api_pb2.KeysetWriteEncryptedResponse: |
| """Writes an encrypted keyset.""" |
| try: |
| master_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.master_keyset)) |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.keyset)) |
| master_aead = master_keyset_handle.primitive(aead.Aead) |
| |
| if request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_BINARY: |
| encrypted_keyset = io.BytesIO() |
| writer = tink.BinaryKeysetWriter(encrypted_keyset) |
| if request.HasField('associated_data'): |
| keyset_handle.write_with_associated_data( |
| writer, master_aead, request.associated_data.value) |
| else: |
| keyset_handle.write(writer, master_aead) |
| return testing_api_pb2.KeysetWriteEncryptedResponse( |
| encrypted_keyset=encrypted_keyset.getvalue()) |
| elif request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_JSON: |
| encrypted_keyset = io.StringIO() |
| writer = tink.JsonKeysetWriter(encrypted_keyset) |
| if request.HasField('associated_data'): |
| keyset_handle.write_with_associated_data( |
| writer, master_aead, request.associated_data.value) |
| else: |
| keyset_handle.write(writer, master_aead) |
| return testing_api_pb2.KeysetWriteEncryptedResponse( |
| encrypted_keyset=encrypted_keyset.getvalue().encode('utf8')) |
| else: |
| raise ValueError('unknown keyset writer type') |
| except tink.TinkError as e: |
| return testing_api_pb2.KeysetWriteEncryptedResponse(err=str(e)) |
| |
| |
| class AeadServicer(testing_api_pb2_grpc.AeadServicer): |
| """A service for testing AEAD encryption.""" |
| |
| def Create(self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates an AEAD without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(aead.Aead) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def Encrypt( |
| self, request: testing_api_pb2.AeadEncryptRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.AeadEncryptResponse: |
| """Encrypts a message.""" |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(aead.Aead) |
| try: |
| ciphertext = p.encrypt(request.plaintext, request.associated_data) |
| return testing_api_pb2.AeadEncryptResponse(ciphertext=ciphertext) |
| except tink.TinkError as e: |
| return testing_api_pb2.AeadEncryptResponse(err=str(e)) |
| |
| def Decrypt( |
| self, request: testing_api_pb2.AeadDecryptRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.AeadDecryptResponse: |
| """Decrypts a message.""" |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(aead.Aead) |
| try: |
| plaintext = p.decrypt(request.ciphertext, request.associated_data) |
| return testing_api_pb2.AeadDecryptResponse(plaintext=plaintext) |
| except tink.TinkError as e: |
| return testing_api_pb2.AeadDecryptResponse(err=str(e)) |
| |
| |
| class StreamingAeadServicer(testing_api_pb2_grpc.StreamingAeadServicer): |
| """A service for testing StreamingAEAD encryption.""" |
| |
| def Create(self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a Streaming Aead without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(streaming_aead.StreamingAead) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def Encrypt( |
| self, request: testing_api_pb2.StreamingAeadEncryptRequest, |
| context: grpc.ServicerContext |
| ) -> testing_api_pb2.StreamingAeadEncryptResponse: |
| """Encrypts a message.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(streaming_aead.StreamingAead) |
| ciphertext_destination = bytes_io.BytesIOWithValueAfterClose() |
| with p.new_encrypting_stream(ciphertext_destination, |
| request.associated_data) as plaintext_stream: |
| plaintext_stream.write(request.plaintext) |
| return testing_api_pb2.StreamingAeadEncryptResponse( |
| ciphertext=ciphertext_destination.value_after_close()) |
| except tink.TinkError as e: |
| return testing_api_pb2.StreamingAeadEncryptResponse(err=str(e)) |
| |
| def Decrypt( |
| self, request: testing_api_pb2.StreamingAeadDecryptRequest, |
| context: grpc.ServicerContext |
| ) -> testing_api_pb2.StreamingAeadDecryptResponse: |
| """Decrypts a message.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(streaming_aead.StreamingAead) |
| stream = io.BytesIO(request.ciphertext) |
| with p.new_decrypting_stream(stream, request.associated_data) as s: |
| plaintext = s.read() |
| return testing_api_pb2.StreamingAeadDecryptResponse(plaintext=plaintext) |
| except tink.TinkError as e: |
| return testing_api_pb2.StreamingAeadDecryptResponse(err=str(e)) |
| |
| |
| class DeterministicAeadServicer(testing_api_pb2_grpc.DeterministicAeadServicer): |
| """A service for testing Deterministic AEAD encryption.""" |
| |
| def Create(self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a Deterministic AEAD without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(daead.DeterministicAead) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def EncryptDeterministically( |
| self, request: testing_api_pb2.DeterministicAeadEncryptRequest, |
| context: grpc.ServicerContext |
| ) -> testing_api_pb2.DeterministicAeadEncryptResponse: |
| """Encrypts a message.""" |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(daead.DeterministicAead) |
| try: |
| ciphertext = p.encrypt_deterministically(request.plaintext, |
| request.associated_data) |
| return testing_api_pb2.DeterministicAeadEncryptResponse( |
| ciphertext=ciphertext) |
| except tink.TinkError as e: |
| return testing_api_pb2.DeterministicAeadEncryptResponse(err=str(e)) |
| |
| def DecryptDeterministically( |
| self, request: testing_api_pb2.DeterministicAeadDecryptRequest, |
| context: grpc.ServicerContext |
| ) -> testing_api_pb2.DeterministicAeadDecryptResponse: |
| """Decrypts a message.""" |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(daead.DeterministicAead) |
| try: |
| plaintext = p.decrypt_deterministically(request.ciphertext, |
| request.associated_data) |
| return testing_api_pb2.DeterministicAeadDecryptResponse( |
| plaintext=plaintext) |
| except tink.TinkError as e: |
| return testing_api_pb2.DeterministicAeadDecryptResponse(err=str(e)) |
| |
| |
| class MacServicer(testing_api_pb2_grpc.MacServicer): |
| """A service for testing MACs.""" |
| |
| def Create(self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a MAC without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(mac.Mac) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def ComputeMac( |
| self, request: testing_api_pb2.ComputeMacRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.ComputeMacResponse: |
| """Computes a MAC.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(mac.Mac) |
| mac_value = p.compute_mac(request.data) |
| return testing_api_pb2.ComputeMacResponse(mac_value=mac_value) |
| except tink.TinkError as e: |
| return testing_api_pb2.ComputeMacResponse(err=str(e)) |
| |
| def VerifyMac( |
| self, request: testing_api_pb2.VerifyMacRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.VerifyMacResponse: |
| """Verifies a MAC value.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(mac.Mac) |
| p.verify_mac(request.mac_value, request.data) |
| return testing_api_pb2.VerifyMacResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.VerifyMacResponse(err=str(e)) |
| |
| |
| class HybridServicer(testing_api_pb2_grpc.HybridServicer): |
| """A service for testing hybrid encryption and decryption.""" |
| |
| def CreateHybridEncrypt( |
| self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a HybridEncrypt without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(hybrid.HybridEncrypt) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def CreateHybridDecrypt( |
| self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a HybridDecrypt without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(hybrid.HybridDecrypt) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def Encrypt( |
| self, request: testing_api_pb2.HybridEncryptRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.HybridEncryptResponse: |
| """Encrypts a message.""" |
| try: |
| public_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader( |
| request.public_annotated_keyset.serialized_keyset)) |
| p = public_keyset_handle.primitive(hybrid.HybridEncrypt) |
| ciphertext = p.encrypt(request.plaintext, request.context_info) |
| return testing_api_pb2.HybridEncryptResponse(ciphertext=ciphertext) |
| except tink.TinkError as e: |
| return testing_api_pb2.HybridEncryptResponse(err=str(e)) |
| |
| def Decrypt( |
| self, request: testing_api_pb2.HybridDecryptRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.HybridDecryptResponse: |
| """Decrypts a message.""" |
| try: |
| private_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader( |
| request.private_annotated_keyset.serialized_keyset)) |
| p = private_keyset_handle.primitive(hybrid.HybridDecrypt) |
| plaintext = p.decrypt(request.ciphertext, request.context_info) |
| return testing_api_pb2.HybridDecryptResponse(plaintext=plaintext) |
| except tink.TinkError as e: |
| return testing_api_pb2.HybridDecryptResponse(err=str(e)) |
| |
| |
| class SignatureServicer(testing_api_pb2_grpc.SignatureServicer): |
| """A service for testing signatures.""" |
| |
| def CreatePublicKeySign( |
| self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a PublicKeySign without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(signature.PublicKeySign) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def CreatePublicKeyVerify( |
| self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a PublicKeyVerify without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(signature.PublicKeyVerify) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def Sign( |
| self, request: testing_api_pb2.SignatureSignRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.SignatureSignResponse: |
| """Signs a message.""" |
| try: |
| private_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader( |
| request.private_annotated_keyset.serialized_keyset)) |
| p = private_keyset_handle.primitive(signature.PublicKeySign) |
| signature_value = p.sign(request.data) |
| return testing_api_pb2.SignatureSignResponse(signature=signature_value) |
| except tink.TinkError as e: |
| return testing_api_pb2.SignatureSignResponse(err=str(e)) |
| |
| def Verify( |
| self, request: testing_api_pb2.SignatureVerifyRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.SignatureVerifyResponse: |
| """Verifies a signature.""" |
| try: |
| public_keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader( |
| request.public_annotated_keyset.serialized_keyset)) |
| p = public_keyset_handle.primitive(signature.PublicKeyVerify) |
| p.verify(request.signature, request.data) |
| return testing_api_pb2.SignatureVerifyResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.SignatureVerifyResponse(err=str(e)) |
| |
| |
| class PrfSetServicer(testing_api_pb2_grpc.PrfSetServicer): |
| """A service for testing PrfSet.""" |
| |
| def Create(self, request: testing_api_pb2.CreationRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: |
| """Creates a PrfSet without using it.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| keyset_handle.primitive(prf.PrfSet) |
| return testing_api_pb2.CreationResponse() |
| except tink.TinkError as e: |
| return testing_api_pb2.CreationResponse(err=str(e)) |
| |
| def KeyIds( |
| self, request: testing_api_pb2.PrfSetKeyIdsRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.PrfSetKeyIdsResponse: |
| """Returns all key IDs and the primary key ID.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| p = keyset_handle.primitive(prf.PrfSet) |
| prfs = p.all() |
| response = testing_api_pb2.PrfSetKeyIdsResponse() |
| response.output.primary_key_id = p.primary_id() |
| response.output.key_id.extend(prfs.keys()) |
| return response |
| except tink.TinkError as e: |
| return testing_api_pb2.PrfSetKeyIdsResponse(err=str(e)) |
| |
| def Compute( |
| self, request: testing_api_pb2.PrfSetComputeRequest, |
| context: grpc.ServicerContext) -> testing_api_pb2.PrfSetComputeResponse: |
| """Computes the output of one PRF.""" |
| try: |
| keyset_handle = cleartext_keyset_handle.read( |
| tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) |
| f = keyset_handle.primitive(prf.PrfSet).all()[request.key_id] |
| return testing_api_pb2.PrfSetComputeResponse( |
| output=f.compute(request.input_data, request.output_length)) |
| except tink.TinkError as e: |
| return testing_api_pb2.PrfSetComputeResponse(err=str(e)) |