Skip to content

Commit

Permalink
Added logger as parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
msetina committed May 2, 2024
1 parent 2bc702d commit ba783a4
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 20 deletions.
25 changes: 18 additions & 7 deletions pkcs11_cryptography_keys/pkcs11_URI/pkcs11_URI.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from logging import Logger
from re import compile
from typing import Any
from urllib.parse import unquote
Expand Down Expand Up @@ -29,7 +30,12 @@


class PKCS11URI(object):
def __init__(self, location: dict[str, str], query: dict[str, str]) -> None:
def __init__(
self,
location: dict[str, str],
query: dict[str, str],
logger: Logger | None = None,
) -> None:
self._PKCS11_key_translation = {
"object": (CKA_LABEL, self.__get_object_value),
"id": (CKA_ID, self.__get_id_value),
Expand All @@ -38,6 +44,7 @@ def __init__(self, location: dict[str, str], query: dict[str, str]) -> None:
self._location: dict[str, str] = location
self._query: dict[str, str] = query
self._operations: list[tuple[int, str]] = []
self._logger = logger if logger is not None else Logger("PKCS11 uri")

def __get_object_value(self, value: str):
return value
Expand Down Expand Up @@ -81,7 +88,9 @@ def __get_type_value(self, value: str):
def parse(
cls,
uri: str,
logger: Logger | None,
) -> PKCS11URI:
local_logger = logger if logger is not None else Logger("URI parser")
grob = compile("(.+?)(\?.+?)?(#.+)?$")
m = grob.match(uri)
if m is not None:
Expand All @@ -106,7 +115,7 @@ def parse(
location[rest[0:b]] = unquote(rest[b + 1 : a])
rest = rest[a + 1 :]
else:
return cls({}, {})
return cls({}, {}, local_logger)
if g[1] is not None:
if g[1].startswith("?"):
rest = g[1][1:]
Expand All @@ -124,7 +133,7 @@ def parse(

else:
raise Exception("Bad query in URI")
return cls(location, query)
return cls(location, query, local_logger)
else:
raise Exception("URI was not parsed properly")
else:
Expand Down Expand Up @@ -254,8 +263,9 @@ def get_key(self, session: Session) -> tuple[bytes | None, int | None, Any]:
objs = session.findObjects(template)
if objs is not None and len(objs) > 0:
if len(objs) > 1:
# TODO: write to log that we got more than one key
print("There is more to what meets the eye.")
self._logger.info(
"There multiple keys with provided URI description."
)
key = objs[0]
attrs = session.getAttributeValue(key, [CKA_KEY_TYPE, CKA_ID])
key_type = attrs[0]
Expand Down Expand Up @@ -297,8 +307,9 @@ def get_private_key(
objs = session.findObjects(template)
if objs is not None and len(objs) > 0:
if len(objs) > 1:
# TODO: write to log that we got more than one key
print("There is more to what meets the eye.")
self._logger.info(
"There multiple keys with provided URI description."
)
key = objs[0]
attrs = session.getAttributeValue(
key, [CKA_KEY_TYPE, CKA_ID, CKA_LABEL]
Expand Down
5 changes: 4 additions & 1 deletion pkcs11_cryptography_keys/sessions/PKCS11_admin_session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

from PyKCS11 import (
CKA_CLASS,
CKA_ID,
Expand Down Expand Up @@ -28,8 +30,9 @@ def __init__(
norm_user: bool = False,
key_label: str | None = None,
key_id: bytes | None = None,
logger: Logger | None = None,
):
super().__init__()
super().__init__(logger)
self._key_id = key_id
self._norm_user = norm_user
self._pksc11_lib = pksc11_lib
Expand Down
4 changes: 3 additions & 1 deletion pkcs11_cryptography_keys/sessions/PKCS11_key_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from importlib import import_module
from logging import Logger

from PyKCS11 import (
CKA_CLASS,
Expand Down Expand Up @@ -34,8 +35,9 @@ def __init__(
token_label: str,
pin: str,
key_label: str | None = None,
logger: Logger | None = None,
):
super().__init__()
super().__init__(logger)
self._key_label = key_label
self._pksc11_lib = pksc11_lib
self._token_label = token_label
Expand Down
26 changes: 24 additions & 2 deletions pkcs11_cryptography_keys/sessions/PKCS11_session.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
from logging import Logger


# contextmanager to facilitate connecting to card token
class PKCS11Session(object):
def __init__(self):
def __init__(self, logger: Logger | None):
self._logger = logger if logger is not None else Logger("PKCS11 sesion")
# session for interacton with the card
self._session = None
# does user need to be logged in to use session
self._login_required = False

def __exit__(self, exc_type, exc_value, exc_traceback):
ret = False
self.close()
if exc_type is not None:
self._logger.error(
"PKCS11 session experience an error : %s",
exc_value,
exc_info=True,
)
else:
self._logger.info("PKCS11 session exited successfully")
return ret

async def __aexit__(self, exc_type, exc_value, traceback):
async def __aexit__(self, exc_type, exc_value, exc_traceback):
ret = False
self.close()
if exc_type is not None:
self._logger.error(
"PKCS11 session experience an error : %s",
exc_value,
exc_info=True,
)
else:
self._logger.info("PKCS11 session exited successfully")
return ret

# Closing work on an open session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

from PyKCS11 import (
CKF_LOGIN_REQUIRED,
CKF_RW_SESSION,
Expand All @@ -20,8 +22,9 @@ def __init__(
token_label: str,
pin: str,
norm_user: bool = False,
logger: Logger | None = None,
):
super().__init__()
super().__init__(logger)
self._pksc11_lib = pksc11_lib
self._token_label = token_label
self._pin = pin
Expand Down
12 changes: 10 additions & 2 deletions pkcs11_cryptography_keys/sessions/PKCS11_slot_session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

from PyKCS11 import (
CKF_LOGIN_REQUIRED,
CKF_RW_SESSION,
Expand All @@ -13,8 +15,14 @@

# contextmanager to facilitate connecting to source
class PKCS11SlotSession(PKCS11Session):
def __init__(self, pksc11_lib: str, token_label: str, pin: str):
super().__init__()
def __init__(
self,
pksc11_lib: str,
token_label: str,
pin: str,
logger: Logger | None = None,
):
super().__init__(logger)
self._pksc11_lib = pksc11_lib
self._token_label = token_label
self._pin = pin
Expand Down
7 changes: 5 additions & 2 deletions pkcs11_cryptography_keys/sessions/PKCS11_uri_admin_session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

from PyKCS11 import (
CKA_CLASS,
CKA_ID,
Expand Down Expand Up @@ -27,8 +29,9 @@ def __init__(
uri: str,
norm_user: bool = False,
pin_getter: Pin4Token | None = None,
logger: Logger | None = None,
):
super().__init__()
super().__init__(logger)
self._norm_user = norm_user
self._uri = uri
self._pin_getter = pin_getter
Expand Down Expand Up @@ -67,7 +70,7 @@ def _get_private_key_info(self, key_label: str | None = None) -> tuple:
# Open session with the card
# Uses pin if needed, reads permited operations(mechanisms)
def open(self) -> PKCS11TokenAdmin | None:
pkcs11_uri = PKCS11URI.parse(self._uri)
pkcs11_uri = PKCS11URI.parse(self._uri, self._logger)
self._login_required = False
self._session = pkcs11_uri.get_session(
self._norm_user, self._pin_getter
Expand Down
6 changes: 4 additions & 2 deletions pkcs11_cryptography_keys/sessions/PKCS11_uri_key_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from importlib import import_module
from logging import Logger

from PyKCS11 import (
CKA_CLASS,
Expand Down Expand Up @@ -34,8 +35,9 @@ def __init__(
self,
uri: str,
pin_getter: Pin4Token | None = None,
logger: Logger | None = None,
):
super().__init__()
super().__init__(logger)
self._uri = uri
self._pin_getter = pin_getter

Expand Down Expand Up @@ -69,7 +71,7 @@ def open(
self,
) -> EllipticCurvePrivateKeyPKCS11 | RSAPrivateKeyPKCS11 | None:
private_key = None
pkcs11_uri = PKCS11URI.parse(self._uri)
pkcs11_uri = PKCS11URI.parse(self._uri, self._logger)
self._login_required = False
self._session = pkcs11_uri.get_session(pin_getter=self._pin_getter)
if self._session is not None:
Expand Down
11 changes: 9 additions & 2 deletions pkcs11_cryptography_keys/utils/listers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

import PyKCS11

from pkcs11_cryptography_keys.sessions.PKCS11_admin_session import (
Expand All @@ -6,15 +8,20 @@


# Support function to list admin sessions
def list_token_admins(pksc11_lib: str, pin: str, norm_user: bool = False):
def list_token_admins(
pksc11_lib: str,
pin: str,
norm_user: bool = False,
logger: Logger | None = None,
):
library = PyKCS11.PyKCS11Lib()
library.load(pksc11_lib)
slots = library.getSlotList(tokenPresent=True)
for sl in slots:
ti = library.getTokenInfo(sl)
if ti.flags & PyKCS11.CKF_TOKEN_INITIALIZED != 0:
yield PKCS11AdminSession(
pksc11_lib, ti.label.strip(), pin, norm_user
pksc11_lib, ti.label.strip(), pin, norm_user, logger=logger
)


Expand Down

0 comments on commit ba783a4

Please sign in to comment.