Skip to content

Commit

Permalink
tests: Place ML ops explicitly on CPU
Browse files Browse the repository at this point in the history
  • Loading branch information
torzdf committed May 22, 2024
1 parent c7f494c commit ba67828
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 111 deletions.
28 changes: 15 additions & 13 deletions tests/lib/model/initializers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
import numpy as np

from keras import initializers as k_initializers, Variable
from keras import device, initializers as k_initializers, Variable

from lib.model import initializers
from lib.utils import get_backend
Expand All @@ -18,7 +18,8 @@

def _runner(init, shape, target_mean=None, target_std=None,
target_max=None, target_min=None):
variable = Variable(init(shape))
with device("cpu"):
variable = Variable(init(shape))
output = variable.numpy()
lim = 3e-2
if target_std is not None:
Expand All @@ -40,13 +41,13 @@ def test_icnr(tensor_shape):
tensor_shape: tuple
The shape of the tensor to feed to the initializer
"""
fan_in, _ = initializers.compute_fans(tensor_shape)
std = np.sqrt(2. / fan_in)
_runner(initializers.ICNR(initializer=k_initializers.he_uniform(), # pylint:disable=no-member
scale=2),
tensor_shape,
target_mean=0,
target_std=std)
with device("cpu"):
fan_in, _ = initializers.compute_fans(tensor_shape)
std = np.sqrt(2. / fan_in)
_runner(initializers.ICNR(initializer=k_initializers.he_uniform(), scale=2),
tensor_shape,
target_mean=0,
target_std=std)


@pytest.mark.parametrize('tensor_shape', [CONV_SHAPE], ids=[CONV_ID])
Expand All @@ -58,7 +59,8 @@ def test_convolution_aware(tensor_shape):
tensor_shape: tuple
The shape of the tensor to feed to the initializer
"""
fan_in, _ = initializers.compute_fans(tensor_shape)
std = np.sqrt(2. / fan_in)
_runner(initializers.ConvolutionAware(seed=123), tensor_shape,
target_mean=0, target_std=std)
with device("cpu"):
fan_in, _ = initializers.compute_fans(tensor_shape)
std = np.sqrt(2. / fan_in)
_runner(initializers.ConvolutionAware(seed=123), tensor_shape,
target_mean=0, target_std=std)
145 changes: 73 additions & 72 deletions tests/lib/model/layers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from numpy.testing import assert_allclose

from keras import Input, Model, backend as K
from keras import device, Input, Model, backend as K

from lib.model import layers
from lib.utils import get_backend
Expand All @@ -26,77 +26,78 @@ def layer_test(layer_cls, kwargs={}, input_shape=None, input_dtype=None, # noqa
"""Test routine for a layer with a single input tensor
and single output tensor.
"""
# generate input data
if input_data is None:
assert input_shape
if not input_dtype:
input_dtype = K.floatx()
input_data_shape = list(input_shape)
for i, var_e in enumerate(input_data_shape):
if var_e is None:
input_data_shape[i] = np.random.randint(1, 4)
input_data = 10 * np.random.random(input_data_shape)
input_data = input_data.astype(input_dtype)
else:
if input_shape is None:
input_shape = input_data.shape
if input_dtype is None:
input_dtype = input_data.dtype
if expected_output_dtype is None:
expected_output_dtype = input_dtype

# instantiation
layer = layer_cls(**kwargs)

# test get_weights , set_weights at layer level
weights = layer.get_weights()
layer.set_weights(weights)

layer.build(input_shape)
expected_output_shape = layer.compute_output_shape(input_shape)

# test in functional API
if fixed_batch_size:
inp = Input(batch_shape=input_shape, dtype=input_dtype)
else:
inp = Input(shape=input_shape[1:], dtype=input_dtype)
outp = layer(inp)
assert outp.dtype == expected_output_dtype

# check with the functional API
model = Model(inp, outp)

actual_output = model.predict(input_data, verbose=0)
actual_output_shape = actual_output.shape
for expected_dim, actual_dim in zip(expected_output_shape,
actual_output_shape):
if expected_dim is not None:
assert expected_dim == actual_dim

if expected_output is not None:
assert_allclose(actual_output, expected_output, rtol=1e-3)

# test serialization, weight setting at model level
model_config = model.get_config()
recovered_model = model.__class__.from_config(model_config)
if model.weights:
weights = model.get_weights()
recovered_model.set_weights(weights)
_output = recovered_model.predict(input_data, verbose=0)
assert_allclose(_output, actual_output, rtol=1e-3)

# test training mode (e.g. useful when the layer has a
# different behavior at training and testing time).
if has_arg(layer.call, 'training'):
model.compile('rmsprop', 'mse')
model.train_on_batch(input_data, actual_output)

# test instantiation from layer config
layer_config = layer.get_config()
layer = layer.__class__.from_config(layer_config)

# for further checks in the caller function
return actual_output
with device("cpu"):
# generate input data
if input_data is None:
assert input_shape
if not input_dtype:
input_dtype = K.floatx()
input_data_shape = list(input_shape)
for i, var_e in enumerate(input_data_shape):
if var_e is None:
input_data_shape[i] = np.random.randint(1, 4)
input_data = 10 * np.random.random(input_data_shape)
input_data = input_data.astype(input_dtype)
else:
if input_shape is None:
input_shape = input_data.shape
if input_dtype is None:
input_dtype = input_data.dtype
if expected_output_dtype is None:
expected_output_dtype = input_dtype

# instantiation
layer = layer_cls(**kwargs)

# test get_weights , set_weights at layer level
weights = layer.get_weights()
layer.set_weights(weights)

layer.build(input_shape)
expected_output_shape = layer.compute_output_shape(input_shape)

# test in functional API
if fixed_batch_size:
inp = Input(batch_shape=input_shape, dtype=input_dtype)
else:
inp = Input(shape=input_shape[1:], dtype=input_dtype)
outp = layer(inp)
assert outp.dtype == expected_output_dtype

# check with the functional API
model = Model(inp, outp)

actual_output = model.predict(input_data, verbose=0)
actual_output_shape = actual_output.shape
for expected_dim, actual_dim in zip(expected_output_shape,
actual_output_shape):
if expected_dim is not None:
assert expected_dim == actual_dim

if expected_output is not None:
assert_allclose(actual_output, expected_output, rtol=1e-3)

# test serialization, weight setting at model level
model_config = model.get_config()
recovered_model = model.__class__.from_config(model_config)
if model.weights:
weights = model.get_weights()
recovered_model.set_weights(weights)
_output = recovered_model.predict(input_data, verbose=0)
assert_allclose(_output, actual_output, rtol=1e-3)

# test training mode (e.g. useful when the layer has a
# different behavior at training and testing time).
if has_arg(layer.call, 'training'):
model.compile('rmsprop', 'mse')
model.train_on_batch(input_data, actual_output)

# test instantiation from layer config
layer_config = layer.get_config()
layer = layer.__class__.from_config(layer_config)

# for further checks in the caller function
return actual_output


@pytest.mark.parametrize('dummy', [None], ids=[get_backend().upper()])
Expand Down
17 changes: 9 additions & 8 deletions tests/lib/model/nn_blocks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from numpy.testing import assert_allclose

from keras import Input, Model, backend as K
from keras import device, Input, Model, backend as K

from lib.model import nn_blocks
from lib.utils import get_backend
Expand Down Expand Up @@ -66,10 +66,11 @@ def test_blocks(use_icnr_init, use_convaware_init, use_reflect_padding):
"conv_aware_init": use_convaware_init,
"reflect_padding": use_reflect_padding}
nn_blocks.set_config(config)
block_test(nn_blocks.Conv2DOutput(64, 3), input_shape=(2, 8, 8, 32))
block_test(nn_blocks.Conv2DBlock(64), input_shape=(2, 8, 8, 32))
block_test(nn_blocks.SeparableConv2DBlock(64), input_shape=(2, 8, 8, 32))
block_test(nn_blocks.UpscaleBlock(64), input_shape=(2, 4, 4, 128))
block_test(nn_blocks.Upscale2xBlock(64, fast=True), input_shape=(2, 4, 4, 128))
block_test(nn_blocks.Upscale2xBlock(64, fast=False), input_shape=(2, 4, 4, 128))
block_test(nn_blocks.ResidualBlock(64), input_shape=(2, 4, 4, 64))
with device("cpu"):
block_test(nn_blocks.Conv2DOutput(64, 3), input_shape=(2, 8, 8, 32))
block_test(nn_blocks.Conv2DBlock(64), input_shape=(2, 8, 8, 32))
block_test(nn_blocks.SeparableConv2DBlock(64), input_shape=(2, 8, 8, 32))
block_test(nn_blocks.UpscaleBlock(64), input_shape=(2, 4, 4, 128))
block_test(nn_blocks.Upscale2xBlock(64, fast=True), input_shape=(2, 4, 4, 128))
block_test(nn_blocks.Upscale2xBlock(64, fast=False), input_shape=(2, 4, 4, 128))
block_test(nn_blocks.ResidualBlock(64), input_shape=(2, 4, 4, 64))
27 changes: 14 additions & 13 deletions tests/lib/model/normalization_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy as np
import pytest

from keras import regularizers, models, layers
from keras import device, regularizers, models, layers

from lib.model import normalization
from lib.utils import get_backend
Expand Down Expand Up @@ -72,18 +72,19 @@ def test_group_normalization(dummy): # pylint:disable=unused-argument
@pytest.mark.parametrize(_PARAMS, _VALUES, ids=_IDS)
def test_adain_normalization(center, scale):
""" Basic test for Ada Instance Normalization. """
norm = normalization.AdaInstanceNormalization(center=center, scale=scale)
shapes = [(4, 8, 8, 1280), (4, 1, 1, 1280), (4, 1, 1, 1280)]
norm.build(shapes)
expected_output_shape = norm.compute_output_shape(shapes)
inputs = [layers.Input(shape=shapes[0][1:]),
layers.Input(shape=shapes[1][1:]),
layers.Input(shape=shapes[2][1:])]
model = models.Model(inputs, norm(inputs))
data = [10 * np.random.random(shape) for shape in shapes]

actual_output = model.predict(data, verbose=0)
actual_output_shape = actual_output.shape
with device("cpu"):
norm = normalization.AdaInstanceNormalization(center=center, scale=scale)
shapes = [(4, 8, 8, 1280), (4, 1, 1, 1280), (4, 1, 1, 1280)]
norm.build(shapes)
expected_output_shape = norm.compute_output_shape(shapes)
inputs = [layers.Input(shape=shapes[0][1:]),
layers.Input(shape=shapes[1][1:]),
layers.Input(shape=shapes[2][1:])]
model = models.Model(inputs, norm(inputs))
data = [10 * np.random.random(shape) for shape in shapes]

actual_output = model.predict(data, verbose=0)
actual_output_shape = actual_output.shape

for expected_dim, actual_dim in zip(expected_output_shape,
actual_output_shape):
Expand Down
10 changes: 5 additions & 5 deletions tests/lib/model/optimizers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

import numpy as np

from keras import optimizers as k_optimizers
from keras import layers as kl, Sequential
from keras import device, layers as kl, optimizers as k_optimizers, Sequential

from lib.model import optimizers
from lib.utils import get_backend
Expand Down Expand Up @@ -37,8 +36,8 @@ def _test_optimizer(optimizer, target=0.75):
model.add(kl.Dense(y_train.shape[1]))
model.add(kl.Activation("softmax"))
model.compile(loss="categorical_crossentropy",
optimizer=optimizer,
metrics=["accuracy"])
optimizer=optimizer,
metrics=["accuracy"])

history = model.fit(x_train, y_train, epochs=2, batch_size=16, verbose=0)
assert history.history["accuracy"][-1] >= target
Expand All @@ -53,4 +52,5 @@ def _test_optimizer(optimizer, target=0.75):
@pytest.mark.parametrize("dummy", [None], ids=[get_backend().upper()])
def test_adabelief(dummy): # pylint:disable=unused-argument
""" Test for custom Adam optimizer """
_test_optimizer(optimizers.AdaBelief(), target=0.20)
with device("cpu"):
_test_optimizer(optimizers.AdaBelief(), target=0.20)

0 comments on commit ba67828

Please sign in to comment.