diff --git a/pkcs11_cryptography_keys/pkcs11_URI/pkcs11_URI.py b/pkcs11_cryptography_keys/pkcs11_URI/pkcs11_URI.py index 81b2d7a..45a743f 100644 --- a/pkcs11_cryptography_keys/pkcs11_URI/pkcs11_URI.py +++ b/pkcs11_cryptography_keys/pkcs11_URI/pkcs11_URI.py @@ -1,5 +1,6 @@ from __future__ import annotations +from logging import Logger from re import compile from typing import Any from urllib.parse import unquote @@ -29,7 +30,12 @@ class PKCS11URI(object): - def __init__(self, location: dict[str, str], query: dict[str, str]) -> None: + def __init__( + self, + location: dict[str, str], + query: dict[str, str], + logger: Logger | None = None, + ) -> None: self._PKCS11_key_translation = { "object": (CKA_LABEL, self.__get_object_value), "id": (CKA_ID, self.__get_id_value), @@ -38,6 +44,7 @@ def __init__(self, location: dict[str, str], query: dict[str, str]) -> None: self._location: dict[str, str] = location self._query: dict[str, str] = query self._operations: list[tuple[int, str]] = [] + self._logger = logger if logger is not None else Logger("PKCS11 uri") def __get_object_value(self, value: str): return value @@ -81,7 +88,9 @@ def __get_type_value(self, value: str): def parse( cls, uri: str, + logger: Logger | None, ) -> PKCS11URI: + local_logger = logger if logger is not None else Logger("URI parser") grob = compile("(.+?)(\?.+?)?(#.+)?$") m = grob.match(uri) if m is not None: @@ -106,7 +115,7 @@ def parse( location[rest[0:b]] = unquote(rest[b + 1 : a]) rest = rest[a + 1 :] else: - return cls({}, {}) + return cls({}, {}, local_logger) if g[1] is not None: if g[1].startswith("?"): rest = g[1][1:] @@ -124,7 +133,7 @@ def parse( else: raise Exception("Bad query in URI") - return cls(location, query) + return cls(location, query, local_logger) else: raise Exception("URI was not parsed properly") else: @@ -254,8 +263,9 @@ def get_key(self, session: Session) -> tuple[bytes | None, int | None, Any]: objs = session.findObjects(template) if objs is not None and len(objs) > 0: if len(objs) > 1: - # TODO: write to log that we got more than one key - print("There is more to what meets the eye.") + self._logger.info( + "There multiple keys with provided URI description." + ) key = objs[0] attrs = session.getAttributeValue(key, [CKA_KEY_TYPE, CKA_ID]) key_type = attrs[0] @@ -297,8 +307,9 @@ def get_private_key( objs = session.findObjects(template) if objs is not None and len(objs) > 0: if len(objs) > 1: - # TODO: write to log that we got more than one key - print("There is more to what meets the eye.") + self._logger.info( + "There multiple keys with provided URI description." + ) key = objs[0] attrs = session.getAttributeValue( key, [CKA_KEY_TYPE, CKA_ID, CKA_LABEL] diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_admin_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_admin_session.py index 8fb067c..e9548dc 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_admin_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_admin_session.py @@ -1,3 +1,5 @@ +from logging import Logger + from PyKCS11 import ( CKA_CLASS, CKA_ID, @@ -28,8 +30,9 @@ def __init__( norm_user: bool = False, key_label: str | None = None, key_id: bytes | None = None, + logger: Logger | None = None, ): - super().__init__() + super().__init__(logger) self._key_id = key_id self._norm_user = norm_user self._pksc11_lib = pksc11_lib diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_key_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_key_session.py index d872190..2fa0972 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_key_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_key_session.py @@ -1,4 +1,5 @@ from importlib import import_module +from logging import Logger from PyKCS11 import ( CKA_CLASS, @@ -34,8 +35,9 @@ def __init__( token_label: str, pin: str, key_label: str | None = None, + logger: Logger | None = None, ): - super().__init__() + super().__init__(logger) self._key_label = key_label self._pksc11_lib = pksc11_lib self._token_label = token_label diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_session.py index 685b346..377fdbb 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_session.py @@ -1,17 +1,39 @@ +from logging import Logger + + # contextmanager to facilitate connecting to card token class PKCS11Session(object): - def __init__(self): + def __init__(self, logger: Logger | None): + self._logger = logger if logger is not None else Logger("PKCS11 sesion") # session for interacton with the card self._session = None # does user need to be logged in to use session self._login_required = False def __exit__(self, exc_type, exc_value, exc_traceback): + ret = False self.close() + if exc_type is not None: + self._logger.error( + "PKCS11 session experience an error : %s", + exc_value, + exc_info=True, + ) + else: + self._logger.info("PKCS11 session exited successfully") + return ret - async def __aexit__(self, exc_type, exc_value, traceback): + async def __aexit__(self, exc_type, exc_value, exc_traceback): ret = False self.close() + if exc_type is not None: + self._logger.error( + "PKCS11 session experience an error : %s", + exc_value, + exc_info=True, + ) + else: + self._logger.info("PKCS11 session exited successfully") return ret # Closing work on an open session diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_slot_admin_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_slot_admin_session.py index e7f7f86..016c5e1 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_slot_admin_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_slot_admin_session.py @@ -1,3 +1,5 @@ +from logging import Logger + from PyKCS11 import ( CKF_LOGIN_REQUIRED, CKF_RW_SESSION, @@ -20,8 +22,9 @@ def __init__( token_label: str, pin: str, norm_user: bool = False, + logger: Logger | None = None, ): - super().__init__() + super().__init__(logger) self._pksc11_lib = pksc11_lib self._token_label = token_label self._pin = pin diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_slot_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_slot_session.py index 0291196..d067d8f 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_slot_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_slot_session.py @@ -1,3 +1,5 @@ +from logging import Logger + from PyKCS11 import ( CKF_LOGIN_REQUIRED, CKF_RW_SESSION, @@ -13,8 +15,14 @@ # contextmanager to facilitate connecting to source class PKCS11SlotSession(PKCS11Session): - def __init__(self, pksc11_lib: str, token_label: str, pin: str): - super().__init__() + def __init__( + self, + pksc11_lib: str, + token_label: str, + pin: str, + logger: Logger | None = None, + ): + super().__init__(logger) self._pksc11_lib = pksc11_lib self._token_label = token_label self._pin = pin diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_uri_admin_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_uri_admin_session.py index 2817b3f..480ff47 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_uri_admin_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_uri_admin_session.py @@ -1,3 +1,5 @@ +from logging import Logger + from PyKCS11 import ( CKA_CLASS, CKA_ID, @@ -27,8 +29,9 @@ def __init__( uri: str, norm_user: bool = False, pin_getter: Pin4Token | None = None, + logger: Logger | None = None, ): - super().__init__() + super().__init__(logger) self._norm_user = norm_user self._uri = uri self._pin_getter = pin_getter @@ -67,7 +70,7 @@ def _get_private_key_info(self, key_label: str | None = None) -> tuple: # Open session with the card # Uses pin if needed, reads permited operations(mechanisms) def open(self) -> PKCS11TokenAdmin | None: - pkcs11_uri = PKCS11URI.parse(self._uri) + pkcs11_uri = PKCS11URI.parse(self._uri, self._logger) self._login_required = False self._session = pkcs11_uri.get_session( self._norm_user, self._pin_getter diff --git a/pkcs11_cryptography_keys/sessions/PKCS11_uri_key_session.py b/pkcs11_cryptography_keys/sessions/PKCS11_uri_key_session.py index f49240d..d6db51c 100644 --- a/pkcs11_cryptography_keys/sessions/PKCS11_uri_key_session.py +++ b/pkcs11_cryptography_keys/sessions/PKCS11_uri_key_session.py @@ -1,4 +1,5 @@ from importlib import import_module +from logging import Logger from PyKCS11 import ( CKA_CLASS, @@ -34,8 +35,9 @@ def __init__( self, uri: str, pin_getter: Pin4Token | None = None, + logger: Logger | None = None, ): - super().__init__() + super().__init__(logger) self._uri = uri self._pin_getter = pin_getter @@ -69,7 +71,7 @@ def open( self, ) -> EllipticCurvePrivateKeyPKCS11 | RSAPrivateKeyPKCS11 | None: private_key = None - pkcs11_uri = PKCS11URI.parse(self._uri) + pkcs11_uri = PKCS11URI.parse(self._uri, self._logger) self._login_required = False self._session = pkcs11_uri.get_session(pin_getter=self._pin_getter) if self._session is not None: diff --git a/pkcs11_cryptography_keys/utils/listers.py b/pkcs11_cryptography_keys/utils/listers.py index f8e807d..eb6e4de 100644 --- a/pkcs11_cryptography_keys/utils/listers.py +++ b/pkcs11_cryptography_keys/utils/listers.py @@ -1,3 +1,5 @@ +from logging import Logger + import PyKCS11 from pkcs11_cryptography_keys.sessions.PKCS11_admin_session import ( @@ -6,7 +8,12 @@ # Support function to list admin sessions -def list_token_admins(pksc11_lib: str, pin: str, norm_user: bool = False): +def list_token_admins( + pksc11_lib: str, + pin: str, + norm_user: bool = False, + logger: Logger | None = None, +): library = PyKCS11.PyKCS11Lib() library.load(pksc11_lib) slots = library.getSlotList(tokenPresent=True) @@ -14,7 +21,7 @@ def list_token_admins(pksc11_lib: str, pin: str, norm_user: bool = False): ti = library.getTokenInfo(sl) if ti.flags & PyKCS11.CKF_TOKEN_INITIALIZED != 0: yield PKCS11AdminSession( - pksc11_lib, ti.label.strip(), pin, norm_user + pksc11_lib, ti.label.strip(), pin, norm_user, logger=logger )