blob: 0a3be2d732b122ca08086fd51a07f9d527d16bba [file] [log] [blame]
"""
Open Quantum Safe (OQS) Python Wrapper for liboqs
The liboqs project provides post-quantum public key cryptography algorithms:
https://github.com/open-quantum-safe/liboqs
This module provides a Python 3 interface to liboqs.
"""
import os
import sys
# import ctypes to call native
import ctypes as ct
import ctypes.util as ctu
# import platform to learn the OS we're on
import platform
LIBOQS_SHARED_OBJ = 'oqs' if platform.system() == 'Windows' else 'liboqs.so'
LIBOQS_INSTALL_PATH = 'LIBOQS_INSTALL_PATH'
# expected return value from native OQS functions
_OQS_SUCCESS = 0
def _load_shared_obj():
"""Attempts to load native OQS library."""
paths = []
# try custom env var first
if LIBOQS_INSTALL_PATH in os.environ:
paths.append(os.environ[LIBOQS_INSTALL_PATH])
# search typical locations too
paths += [ctu.find_library('oqs'), os.path.join(os.curdir, LIBOQS_SHARED_OBJ)]
dll = ct.windll if platform.system() == 'Windows' else ct.cdll
for path in paths:
if path and os.path.exists(path):
lib = dll.LoadLibrary(path)
return lib
raise RuntimeError('No liboqs.so found!')
try:
# TODO: fix module 'global' liboqs
liboqs = _load_shared_obj()
assert liboqs
except OSError as err:
sys.exit('Could not load liboqs shared library')
except RuntimeError as err:
sys.exit('No liboqs shared libraries found')
class MechanismNotSupportedError(Exception):
"""Exception raised when an algorithm is not supported by OQS."""
def __init__(self, alg_name):
"""
:param alg_name: requested algorithm name
"""
self.alg_name = alg_name
self.message = alg_name + ' is not supported by OQS'
class MechanismNotEnabledError(MechanismNotSupportedError):
"""
Exception raised when an algorithm is not supported but not enabled by OQS.
"""
def __init__(self, alg_name):
"""
:param alg_name: requested algorithm name
"""
self.alg_name = alg_name
self.message = alg_name + ' is not supported but not enabled by OQS'
class KeyEncapsulation(ct.Structure):
"""
An OQS KeyEncapsulation wraps native/C liboqs OQS_KEM structs.
The wrapper maps methods to the C equivalent as follows:
Python | C liboqs
-------------------------------
generate_keypair | keypair
encap_secret | encaps
decap_secret | decaps
free | OQS_KEM_free
"""
_fields_ = [
("method_name", ct.c_char_p),
("alg_version", ct.c_char_p),
("claimed_nist_level", ct.c_ubyte),
("ind_cca", ct.c_ubyte),
("length_public_key", ct.c_size_t),
("length_secret_key", ct.c_size_t),
("length_ciphertext", ct.c_size_t),
("length_shared_secret", ct.c_size_t),
("keypair_cb", ct.c_void_p),
("encaps_cb", ct.c_void_p),
("decaps_cb", ct.c_void_p)
]
def __init__(self, alg_name, secret_key=None):
"""
Create new KeyEncapsulation with the given algorithm.
:param alg_name: KEM mechanism algorithm name
:param secret_key: optional if generating by generate_keypair() later
"""
self.alg_name = alg_name
if alg_name not in _enabled_KEMs:
# perhaps it's a supported but not enabled alg
if alg_name in _supported_KEMs:
raise MechanismNotEnabledError(alg_name)
else:
raise MechanismNotSupportedError(alg_name)
self._kem = liboqs.OQS_KEM_new(ct.create_string_buffer(alg_name.encode()))
self.details = {
'name': self._kem.contents.method_name.decode(),
'version': self._kem.contents.alg_version.decode(),
'claimed_nist_level': int(self._kem.contents.claimed_nist_level),
'is_ind_cca' : bool(self._kem.contents.ind_cca),
'length_public_key': int(self._kem.contents.length_public_key),
'length_secret_key': int(self._kem.contents.length_secret_key),
'length_ciphertext': int(self._kem.contents.length_ciphertext),
'length_shared_secret': int(self._kem.contents.length_shared_secret)}
if secret_key:
self.secret_key = ct.create_string_buffer(secret_key, self._kem.contents.length_secret_key)
def __enter__(self):
return self
def __exit__(self, ctx_type, ctx_value, ctx_traceback):
self.free()
def generate_keypair(self):
"""
Generates a new keypair and returns the public key.
If needed, the secret key can be obtained with export_secret_key().
"""
public_key = ct.create_string_buffer(self._kem.contents.length_public_key)
self.secret_key = ct.create_string_buffer(self._kem.contents.length_secret_key)
rv = liboqs.OQS_KEM_keypair(self._kem, ct.byref(public_key), ct.byref(self.secret_key))
return bytes(public_key) if rv == _OQS_SUCCESS else 0
def export_secret_key(self):
"""Exports the secret key."""
return bytes(self.secret_key)
def encap_secret(self, public_key):
"""
Generates and encapsulates a secret using the provided public key.
:param public_key: the peer's public key.
"""
my_public_key = ct.create_string_buffer(public_key, self._kem.contents.length_public_key)
ciphertext = ct.create_string_buffer(self._kem.contents.length_ciphertext)
shared_secret = ct.create_string_buffer(self._kem.contents.length_shared_secret)
rv = liboqs.OQS_KEM_encaps(self._kem, ct.byref(ciphertext), ct.byref(shared_secret), my_public_key)
return bytes(ciphertext), bytes(shared_secret) if rv == _OQS_SUCCESS else 0
def decap_secret(self, ciphertext):
"""
Decapsulates the ciphertext and returns the secret.
:param ciphertext: the ciphertext received from the peer.
"""
my_ciphertext = ct.create_string_buffer(ciphertext, self._kem.contents.length_ciphertext)
shared_secret = ct.create_string_buffer(self._kem.contents.length_shared_secret)
rv = liboqs.OQS_KEM_decaps(self._kem, ct.byref(shared_secret), my_ciphertext, self.secret_key)
return bytes(shared_secret) if rv == _OQS_SUCCESS else 0
def free(self):
"""Releases the native resources."""
liboqs.OQS_KEM_free(self._kem)
if hasattr(self, 'secret_key'):
liboqs.OQS_MEM_cleanse(ct.byref(self.secret_key), self._kem.contents.length_secret_key)
def __repr__(self):
return "Key encapsulation mechanism: " + self._kem.contents.method_name.decode()
liboqs.OQS_KEM_new.restype = ct.POINTER(KeyEncapsulation)
liboqs.OQS_KEM_alg_identifier.restype = ct.c_char_p
def is_KEM_enabled(alg_name):
"""Returns True if the KEM algorithm is enabled.
:param alg_name: a KEM mechanism algorithm name
"""
try:
kem = liboqs.OQS_KEM_new(ct.create_string_buffer(alg_name.encode()))
if kem.contents:
liboqs.OQS_KEM_free(kem)
return True
except ValueError:
pass
return False
_KEM_alg_ids = [liboqs.OQS_KEM_alg_identifier(i) for i in range(liboqs.OQS_KEM_alg_count())]
_supported_KEMs = [i.decode() for i in _KEM_alg_ids]
_enabled_KEMs = [i for i in _supported_KEMs if is_KEM_enabled(i)]
def get_enabled_KEM_mechanisms():
"""Returns the list of enabled KEM mechanisms."""
return _enabled_KEMs
def get_supported_KEM_mechanisms():
"""Returns list of supported KEM mechanisms."""
return _supported_KEMs
class Signature(ct.Structure):
"""
An OQS Signature wraps native/C liboqs OQS_SIG structs.
The wrapper maps methods to the C equivalent as follows:
Python | C liboqs
-------------------------------
generate_keypair | keypair
sign | sign
verify | verify
free | OQS_SIG_free
"""
_fields_ = [
("method_name", ct.c_char_p),
("alg_version", ct.c_char_p),
("claimed_nist_level", ct.c_ubyte),
("euf_cma", ct.c_ubyte),
("length_public_key", ct.c_size_t),
("length_secret_key", ct.c_size_t),
("length_signature", ct.c_size_t),
("keypair_cb", ct.c_void_p),
("sign_cb", ct.c_void_p),
("verify_cb", ct.c_void_p)
]
def __init__(self, alg_name, secret_key = None):
"""
Create new Signature with the given algorithm.
:param alg_name: a signature mechanism algorithm name. Enabled signature
mechanisms can be obtained with get_enabled_KEM_mechanisms().
:param secret_key: optional, if generated by generate_keypair()
"""
if alg_name not in _enabled_sigs:
# perhaps it's a supported but not enabled alg
if alg_name in _supported_sigs:
raise MechanismNotEnabledError(alg_name)
else:
raise MechanismNotSupportedError(alg_name)
self._sig = liboqs.OQS_SIG_new( ct.create_string_buffer(alg_name.encode()))
self.details = {
'name': self._sig.contents.method_name.decode(),
'version': self._sig.contents.alg_version.decode(),
'claimed_nist_level': int(self._sig.contents.claimed_nist_level),
'is_euf_cma': bool(self._sig.contents.euf_cma),
'length_public_key': int(self._sig.contents.length_public_key),
'length_secret_key': int(self._sig.contents.length_secret_key),
'length_signature': int(self._sig.contents.length_signature)}
if secret_key:
self.secret_key = ct.create_string_buffer(secret_key, self._sig.contents.length_secret_key)
def __enter__(self):
return self
def __exit__(self, ctx_type, ctx_value, ctx_traceback):
self.free()
def generate_keypair(self):
"""
Generates a new keypair and returns the public key.
If needed, the secret key can be obtained with export_secret_key().
"""
public_key = ct.create_string_buffer(self._sig.contents.length_public_key)
self.secret_key = ct.create_string_buffer(self._sig.contents.length_secret_key)
rv = liboqs.OQS_SIG_keypair(self._sig, ct.byref(public_key), ct.byref(self.secret_key))
return bytes(public_key) if rv == _OQS_SUCCESS else 0
def export_secret_key(self):
"""Exports the secret key."""
return bytes(self.secret_key)
def sign(self, message):
"""
Signs the provided message and returns the signature.
:param message: the message to sign.
"""
# provide length to avoid extra null char
my_message = ct.create_string_buffer(message, len(message))
message_len = ct.c_int(len(my_message))
signature = ct.create_string_buffer(self._sig.contents.length_signature)
sig_len = ct.c_int(self._sig.contents.length_signature) # initialize to maximum signature size
rv = liboqs.OQS_SIG_sign(self._sig, ct.byref(signature),
ct.byref(sig_len), my_message,
message_len, self.secret_key)
return bytes(signature[:sig_len.value]) if rv == _OQS_SUCCESS else 0
def verify(self, message, signature, public_key):
"""
Verifies the provided signature on the message; returns True if valid.
:param message: the signed message.
:param signature: the signature on the message.
:param public_key: the signer's public key.
"""
# provide length to avoid extra null char
my_message = ct.create_string_buffer(message, len(message))
message_len = ct.c_int(len(my_message))
# provide length to avoid extra null char in sig
my_signature = ct.create_string_buffer(signature, len(signature))
sig_len = ct.c_int(len(my_signature))
my_public_key = ct.create_string_buffer(public_key, self._sig.contents.length_public_key)
rv = liboqs.OQS_SIG_verify(self._sig, my_message, message_len,
my_signature, sig_len, my_public_key)
return True if rv == _OQS_SUCCESS else False
def free(self):
"""Releases the native resources."""
liboqs.OQS_SIG_free(self._sig)
if hasattr(self, 'secret_key'):
liboqs.OQS_MEM_cleanse(ct.byref(self.secret_key), self._sig.contents.length_secret_key)
def __repr__(self):
return "Signature mechanism: " + self._sig.contents.method_name.decode()
liboqs.OQS_SIG_new.restype = ct.POINTER(Signature)
liboqs.OQS_SIG_alg_identifier.restype = ct.c_char_p
def is_sig_enabled(alg_name):
"""Returns True if the signature algorithm is enabled.
:param alg_name: a signature mechanism algorithm name
"""
try:
sig = liboqs.OQS_SIG_new(ct.create_string_buffer(alg_name.encode()))
if sig.contents:
liboqs.OQS_SIG_free(sig)
return True
except ValueError:
pass
return False
_sig_alg_ids = [liboqs.OQS_SIG_alg_identifier(i) for i in range(liboqs.OQS_SIG_alg_count())]
_supported_sigs = [i.decode() for i in _sig_alg_ids]
_enabled_sigs = [i for i in _supported_sigs if is_sig_enabled(i)]
def get_enabled_sig_mechanisms():
"""Returns the list of enabled signature mechanisms."""
return _enabled_sigs
def get_supported_sig_mechanisms():
"""Returns list of supported signature mechanisms."""
return _supported_sigs