Skip to content

Commit

Permalink
Add Ragger tests and remove obsolete functional tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
agrojean-ledger committed Nov 7, 2023
1 parent 8a9f430 commit 3b35d0d
Show file tree
Hide file tree
Showing 51 changed files with 854 additions and 35 deletions.
1 change: 0 additions & 1 deletion test/menu.apdu

This file was deleted.

1 change: 0 additions & 1 deletion test/overflow.apdu

This file was deleted.

1 change: 0 additions & 1 deletion test/quit.apdu

This file was deleted.

1 change: 0 additions & 1 deletion test/sign.apdu

This file was deleted.

31 changes: 0 additions & 31 deletions test/test_cmds.py

This file was deleted.

Empty file.
127 changes: 127 additions & 0 deletions tests/application_client/boilerplate_command_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from enum import IntEnum
from typing import Generator, List, Optional
from contextlib import contextmanager

from ragger.backend.interface import BackendInterface, RAPDU
from ragger.bip import pack_derivation_path


MAX_APDU_LEN: int = 255

CLA: int = 0xE0

class P1(IntEnum):
# Parameter 1 for first APDU number.
P1_START = 0x00
# Parameter 1 for maximum APDU number.
P1_MAX = 0x03
# Parameter 1 for screen confirmation for GET_PUBLIC_KEY.
P1_CONFIRM = 0x01

class P2(IntEnum):
# Parameter 2 for last APDU to receive.
P2_LAST = 0x00
# Parameter 2 for more APDU to receive.
P2_MORE = 0x80

class InsType(IntEnum):
GET_VERSION = 0x03
GET_APP_NAME = 0x04
GET_PUBLIC_KEY = 0x05
SIGN_TX = 0x06

class Errors(IntEnum):
SW_DENY = 0x6985
SW_WRONG_P1P2 = 0x6A86
SW_WRONG_DATA_LENGTH = 0x6A87
SW_INS_NOT_SUPPORTED = 0x6D00
SW_CLA_NOT_SUPPORTED = 0x6E00
SW_WRONG_RESPONSE_LENGTH = 0xB000
SW_DISPLAY_BIP32_PATH_FAIL = 0xB001
SW_DISPLAY_ADDRESS_FAIL = 0xB002
SW_DISPLAY_AMOUNT_FAIL = 0xB003
SW_WRONG_TX_LENGTH = 0xB004
SW_TX_PARSING_FAIL = 0xB005
SW_TX_HASH_FAIL = 0xB006
SW_BAD_STATE = 0xB007
SW_SIGNATURE_FAIL = 0xB008


def split_message(message: bytes, max_size: int) -> List[bytes]:
return [message[x:x + max_size] for x in range(0, len(message), max_size)]


class BoilerplateCommandSender:
def __init__(self, backend: BackendInterface) -> None:
self.backend = backend


def get_app_and_version(self) -> RAPDU:
return self.backend.exchange(cla=0xB0, # specific CLA for BOLOS
ins=0x01, # specific INS for get_app_and_version
p1=P1.P1_START,
p2=P2.P2_LAST,
data=b"")


def get_version(self) -> RAPDU:
return self.backend.exchange(cla=CLA,
ins=InsType.GET_VERSION,
p1=P1.P1_START,
p2=P2.P2_LAST,
data=b"")


def get_app_name(self) -> RAPDU:
return self.backend.exchange(cla=CLA,
ins=InsType.GET_APP_NAME,
p1=P1.P1_START,
p2=P2.P2_LAST,
data=b"")


def get_public_key(self, path: str) -> RAPDU:
return self.backend.exchange(cla=CLA,
ins=InsType.GET_PUBLIC_KEY,
p1=P1.P1_START,
p2=P2.P2_LAST,
data=pack_derivation_path(path))


@contextmanager
def get_public_key_with_confirmation(self, path: str) -> Generator[None, None, None]:
with self.backend.exchange_async(cla=CLA,
ins=InsType.GET_PUBLIC_KEY,
p1=P1.P1_CONFIRM,
p2=P2.P2_LAST,
data=pack_derivation_path(path)) as response:
yield response


@contextmanager
def sign_tx(self, path: str, transaction: bytes) -> Generator[None, None, None]:
self.backend.exchange(cla=CLA,
ins=InsType.SIGN_TX,
p1=P1.P1_START,
p2=P2.P2_MORE,
data=pack_derivation_path(path))
messages = split_message(transaction, MAX_APDU_LEN)
idx: int = P1.P1_START + 1

for msg in messages[:-1]:
self.backend.exchange(cla=CLA,
ins=InsType.SIGN_TX,
p1=idx,
p2=P2.P2_MORE,
data=msg)
idx += 1

with self.backend.exchange_async(cla=CLA,
ins=InsType.SIGN_TX,
p1=idx,
p2=P2.P2_LAST,
data=messages[-1]) as response:
yield response

def get_async_response(self) -> Optional[RAPDU]:
return self.backend.last_async_response
69 changes: 69 additions & 0 deletions tests/application_client/boilerplate_response_unpacker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Tuple
from struct import unpack

# remainder, data_len, data
def pop_sized_buf_from_buffer(buffer:bytes, size:int) -> Tuple[bytes, bytes]:
return buffer[size:], buffer[0:size]

# remainder, data_len, data
def pop_size_prefixed_buf_from_buf(buffer:bytes) -> Tuple[bytes, int, bytes]:
data_len = buffer[0]
return buffer[1+data_len:], data_len, buffer[1:data_len+1]

# Unpack from response:
# response = app_name (var)
def unpack_get_app_name_response(response: bytes) -> str:
return response.decode("ascii")

# Unpack from response:
# response = MAJOR (1)
# MINOR (1)
# PATCH (1)
def unpack_get_version_response(response: bytes) -> Tuple[int, int, int]:
assert len(response) == 3
major, minor, patch = unpack("BBB", response)
return (major, minor, patch)

# Unpack from response:
# response = format_id (1)
# app_name_raw_len (1)
# app_name_raw (var)
# version_raw_len (1)
# version_raw (var)
# unused_len (1)
# unused (var)
def unpack_get_app_and_version_response(response: bytes) -> Tuple[str, str]:
response, _ = pop_sized_buf_from_buffer(response, 1)
response, _, app_name_raw = pop_size_prefixed_buf_from_buf(response)
response, _, version_raw = pop_size_prefixed_buf_from_buf(response)
response, _, _ = pop_size_prefixed_buf_from_buf(response)

assert len(response) == 0

return app_name_raw.decode("ascii"), version_raw.decode("ascii")

# Unpack from response:
# response = pub_key_len (1)
# pub_key (var)
# chain_code_len (1)
# chain_code (var)
def unpack_get_public_key_response(response: bytes) -> Tuple[int, bytes, int, bytes]:
response, pub_key_len, pub_key = pop_size_prefixed_buf_from_buf(response)
response, chain_code_len, chain_code = pop_size_prefixed_buf_from_buf(response)

assert pub_key_len == 65
assert chain_code_len == 32
assert len(response) == 0
return pub_key_len, pub_key, chain_code_len, chain_code

# Unpack from response:
# response = der_sig_len (1)
# der_sig (var)
# v (1)
def unpack_sign_tx_response(response: bytes) -> Tuple[int, bytes, int]:
response, der_sig_len, der_sig = pop_size_prefixed_buf_from_buf(response)
response, v = pop_sized_buf_from_buffer(response, 1)

assert len(response) == 0

return der_sig_len, der_sig, int.from_bytes(v, byteorder='big')
52 changes: 52 additions & 0 deletions tests/application_client/boilerplate_transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from io import BytesIO
from typing import Union

from .boilerplate_utils import read, read_uint, read_varint, write_varint, UINT64_MAX


class TransactionError(Exception):
pass


class Transaction:
def __init__(self,
nonce: int,
to: Union[str, bytes],
value: int,
memo: str,
do_check: bool = True) -> None:
self.nonce: int = nonce
self.to: bytes = bytes.fromhex(to[2:]) if isinstance(to, str) else to
self.value: int = value
self.memo: bytes = memo.encode("ascii")

if do_check:
if not 0 <= self.nonce <= UINT64_MAX:
raise TransactionError(f"Bad nonce: '{self.nonce}'!")

if not 0 <= self.value <= UINT64_MAX:
raise TransactionError(f"Bad value: '{self.value}'!")

if len(self.to) != 20:
raise TransactionError(f"Bad address: '{self.to.hex()}'!")

def serialize(self) -> bytes:
return b"".join([
self.nonce.to_bytes(8, byteorder="big"),
self.to,
self.value.to_bytes(8, byteorder="big"),
write_varint(len(self.memo)),
self.memo
])

@classmethod
def from_bytes(cls, hexa: Union[bytes, BytesIO]):
buf: BytesIO = BytesIO(hexa) if isinstance(hexa, bytes) else hexa

nonce: int = read_uint(buf, 64, byteorder="big")
to: bytes = read(buf, 20)
value: int = read_uint(buf, 64, byteorder="big")
memo_len: int = read_varint(buf)
memo: str = read(buf, memo_len).decode("ascii")

return cls(nonce=nonce, to=to, value=value, memo=memo)
61 changes: 61 additions & 0 deletions tests/application_client/boilerplate_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from io import BytesIO
from typing import Optional, Literal


UINT64_MAX: int = 2**64-1
UINT32_MAX: int = 2**32-1
UINT16_MAX: int = 2**16-1


def write_varint(n: int) -> bytes:
if n < 0xFC:
return n.to_bytes(1, byteorder="little")

if n <= UINT16_MAX:
return b"\xFD" + n.to_bytes(2, byteorder="little")

if n <= UINT32_MAX:
return b"\xFE" + n.to_bytes(4, byteorder="little")

if n <= UINT64_MAX:
return b"\xFF" + n.to_bytes(8, byteorder="little")

raise ValueError(f"Can't write to varint: '{n}'!")


def read_varint(buf: BytesIO,
prefix: Optional[bytes] = None) -> int:
b: bytes = prefix if prefix else buf.read(1)

if not b:
raise ValueError(f"Can't read prefix: '{b.hex()}'!")

n: int = {b"\xfd": 2, b"\xfe": 4, b"\xff": 8}.get(b, 1) # default to 1

b = buf.read(n) if n > 1 else b

if len(b) != n:
raise ValueError("Can't read varint!")

return int.from_bytes(b, byteorder="little")


def read(buf: BytesIO, size: int) -> bytes:
b: bytes = buf.read(size)

if len(b) < size:
raise ValueError(f"Can't read {size} bytes in buffer!")

return b


def read_uint(buf: BytesIO,
bit_len: int,
byteorder: Literal['big', 'little'] = 'little') -> int:
size: int = bit_len // 8
b: bytes = buf.read(size)

if len(b) < size:
raise ValueError(f"Can't read u{bit_len} in buffer!")

return int.from_bytes(b, byteorder)
Empty file.
Loading

0 comments on commit 3b35d0d

Please sign in to comment.