From cc01b8fdacc75eb028b6cb7f924e1539edc40d28 Mon Sep 17 00:00:00 2001 From: Anand Balakrishnan Date: Sun, 18 Feb 2024 21:45:56 -0800 Subject: [PATCH 1/4] Make package installable by setuptools --- src/__init__.py | 0 __init__.py => stlcg/__init__.py | 0 {src => stlcg}/logger.py | 0 {src => stlcg}/stlcg.py | 0 {src => stlcg}/stlviz.py | 0 {src => stlcg}/utils.py | 0 6 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/__init__.py rename __init__.py => stlcg/__init__.py (100%) rename {src => stlcg}/logger.py (100%) rename {src => stlcg}/stlcg.py (100%) rename {src => stlcg}/stlviz.py (100%) rename {src => stlcg}/utils.py (100%) diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/__init__.py b/stlcg/__init__.py similarity index 100% rename from __init__.py rename to stlcg/__init__.py diff --git a/src/logger.py b/stlcg/logger.py similarity index 100% rename from src/logger.py rename to stlcg/logger.py diff --git a/src/stlcg.py b/stlcg/stlcg.py similarity index 100% rename from src/stlcg.py rename to stlcg/stlcg.py diff --git a/src/stlviz.py b/stlcg/stlviz.py similarity index 100% rename from src/stlviz.py rename to stlcg/stlviz.py diff --git a/src/utils.py b/stlcg/utils.py similarity index 100% rename from src/utils.py rename to stlcg/utils.py From b47c387f605ba6591ee0c30d1c63c4031a52c98d Mon Sep 17 00:00:00 2001 From: Anand Balakrishnan Date: Sun, 18 Feb 2024 21:57:31 -0800 Subject: [PATCH 2/4] style: run formatter --- examples/pstl.py | 124 +++-- pyproject.toml | 31 ++ setup.py | 22 - stlcg/logger.py | 9 +- stlcg/stlcg.py | 1140 ++++++++++++++++++++++++++++++++++------------ stlcg/stlviz.py | 45 +- stlcg/utils.py | 32 +- 7 files changed, 1019 insertions(+), 384 deletions(-) create mode 100644 pyproject.toml diff --git a/examples/pstl.py b/examples/pstl.py index e060681..ed07025 100644 --- a/examples/pstl.py +++ b/examples/pstl.py @@ -1,39 +1,43 @@ import sys -sys.path.append('../src') -import matplotlib.pyplot as plt -import numpy as np + +sys.path.append("../src") import os import pickle -import torch -import stlcg +import matplotlib.pyplot as plt +import numpy as np import scipy.io +import torch + +import stlcg -mat = scipy.io.loadmat('models/clustering_data.mat') +mat = scipy.io.loadmat("models/clustering_data.mat") -ys_ = mat['ys'] +ys_ = mat["ys"] ys = np.zeros(ys_.shape) -ys[:,:] = np.fliplr(ys_) -t = mat['T'][0,:] +ys[:, :] = np.fliplr(ys_) +t = mat["T"][0, :] N = ys.shape[0] + def run_multiple_times(num, func, ys): for i in range(num): func(ys, save_filename=None) + def binary_search_settling(ys, save_filename="models/pstl_settling_binary_search.npy"): ϵ_list = [] N = ys.shape[0] for i in range(N): - y = torch.as_tensor(ys[i:i+1,:]).float().unsqueeze(-1) - s = stlcg.Expression('s', torch.abs(y - 1)) + y = torch.as_tensor(ys[i : i + 1, :]).float().unsqueeze(-1) + s = stlcg.Expression("s", torch.abs(y - 1)) ϵL = torch.as_tensor(np.zeros([1, 1, 1])).float().requires_grad_(True) ϵU = torch.as_tensor(np.ones([1, 1, 1])).float().requires_grad_(True) ϵ = torch.as_tensor(np.ones([1, 1, 1])).float().requires_grad_(True) ϕL = stlcg.Always(subformula=(s < ϵL), interval=[50, 100]) ϕU = stlcg.Always(subformula=(s < ϵU), interval=[50, 100]) - ϕ = stlcg.Always(subformula=(s < ϵ), interval=[50, 100]) - while torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5*1E-3: + ϕ = stlcg.Always(subformula=(s < ϵ), interval=[50, 100]) + while torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5 * 1e-3: ϵ = 0.5 * (ϕU.subformula.val + ϕL.subformula.val) ϕ.subformula.val = ϵ r = ϕ.robustness(s).squeeze() @@ -47,29 +51,39 @@ def binary_search_settling(ys, save_filename="models/pstl_settling_binary_search return np.stack(ϵ_list) np.save(save_filename, np.stack(ϵ_list)) + def binary_search_settling_vectorize(ys): N = ys.shape[0] y = torch.as_tensor(ys).float().unsqueeze(-1) - s = stlcg.Expression('s', torch.abs(y - 1)) + s = stlcg.Expression("s", torch.abs(y - 1)) ϵL = torch.as_tensor(np.zeros([N, 1, 1])).float().requires_grad_(True) ϵU = torch.as_tensor(np.ones([N, 1, 1])).float().requires_grad_(True) ϵ = torch.as_tensor(np.ones([N, 1, 1])).float().requires_grad_(True) ϕL = stlcg.Always(subformula=(s < ϵL), interval=[50, 100]) ϕU = stlcg.Always(subformula=(s < ϵU), interval=[50, 100]) - ϕ = stlcg.Always(subformula=(s < ϵ), interval=[50, 100]) - while torch.abs(ϕU.subformula.val - ϕL.subformula.val).max() > 5*1E-3: + ϕ = stlcg.Always(subformula=(s < ϵ), interval=[50, 100]) + while torch.abs(ϕU.subformula.val - ϕL.subformula.val).max() > 5 * 1e-3: ϵ = 0.5 * (ϕU.subformula.val + ϕL.subformula.val) ϕ.subformula.val = ϵ r = ϕ.robustness(s) - ϕU.subformula.val = torch.where(torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5*1E-3, torch.where(r > 0, ϵ, ϕU.subformula.val), ϕU.subformula.val) - ϕL.subformula.val = torch.where(torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5*1E-3, torch.where(r <= 0, ϵ, ϕL.subformula.val), ϕL.subformula.val) + ϕU.subformula.val = torch.where( + torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5 * 1e-3, + torch.where(r > 0, ϵ, ϕU.subformula.val), + ϕU.subformula.val, + ) + ϕL.subformula.val = torch.where( + torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5 * 1e-3, + torch.where(r <= 0, ϵ, ϕL.subformula.val), + ϕL.subformula.val, + ) return ϵ.squeeze().detach().numpy() + def stlcg_settling(ys, save_filename="models/pstl_settling_stlcg.npy"): max_epoch = 1000 N = ys.shape[0] y = torch.as_tensor(ys).float().unsqueeze(-1) - s = stlcg.Expression('s', torch.abs(y - 1)) + s = stlcg.Expression("s", torch.abs(y - 1)) ϵ = torch.as_tensor(np.zeros([N, 1, 1])).float().requires_grad_(True) ϕ = stlcg.Always(subformula=(s < ϵ), interval=[50, 100]) @@ -79,7 +93,7 @@ def stlcg_settling(ys, save_filename="models/pstl_settling_stlcg.npy"): loss.backward() with torch.no_grad(): - ϵ -= 0.005* ϵ.grad + ϵ -= 0.005 * ϵ.grad ϵ.grad.zero_() if loss == 0: @@ -89,19 +103,21 @@ def stlcg_settling(ys, save_filename="models/pstl_settling_stlcg.npy"): np.save(save_filename, ϵ.squeeze().detach().numpy()) -def binary_search_overshoot(ys, save_filename="models/pstl_overshoot_binary_search.npy"): +def binary_search_overshoot( + ys, save_filename="models/pstl_overshoot_binary_search.npy" +): ϵ_list = [] N = ys.shape[0] for i in range(N): - y = torch.as_tensor(ys[i:i+1,:]).float().unsqueeze(-1) - s = stlcg.Expression('s', y) + y = torch.as_tensor(ys[i : i + 1, :]).float().unsqueeze(-1) + s = stlcg.Expression("s", y) ϵL = torch.as_tensor(np.zeros([1, 1, 1])).float().requires_grad_(True) - ϵU = torch.as_tensor(2*np.ones([1, 1, 1])).float().requires_grad_(True) + ϵU = torch.as_tensor(2 * np.ones([1, 1, 1])).float().requires_grad_(True) ϵ = torch.as_tensor(np.ones([1, 1, 1])).float().requires_grad_(True) ϕL = stlcg.Always(subformula=(s < ϵL)) ϕU = stlcg.Always(subformula=(s < ϵU)) - ϕ = stlcg.Always(subformula=(s < ϵ)) - while torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5*1E-3: + ϕ = stlcg.Always(subformula=(s < ϵ)) + while torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5 * 1e-3: ϵ = 0.5 * (ϕU.subformula.val + ϕL.subformula.val) ϕ.subformula.val = ϵ r = ϕ.robustness(s).squeeze() @@ -114,30 +130,40 @@ def binary_search_overshoot(ys, save_filename="models/pstl_overshoot_binary_sear if save_filename is None: return np.stack(ϵ_list) np.save(save_filename, np.stack(ϵ_list)) - + + def binary_search_overshoot_vectorize(ys): N = ys.shape[0] y = torch.as_tensor(ys).float().unsqueeze(-1) - s = stlcg.Expression('s', y) + s = stlcg.Expression("s", y) ϵL = torch.as_tensor(np.zeros([N, 1, 1])).float().requires_grad_(True) - ϵU = torch.as_tensor(2*np.ones([N, 1, 1])).float().requires_grad_(True) + ϵU = torch.as_tensor(2 * np.ones([N, 1, 1])).float().requires_grad_(True) ϵ = torch.as_tensor(np.ones([N, 1, 1])).float().requires_grad_(True) ϕL = stlcg.Always(subformula=(s < ϵL)) ϕU = stlcg.Always(subformula=(s < ϵU)) - ϕ = stlcg.Always(subformula=(s < ϵ)) - while torch.abs(ϕU.subformula.val - ϕL.subformula.val).max() > 5*1E-3: + ϕ = stlcg.Always(subformula=(s < ϵ)) + while torch.abs(ϕU.subformula.val - ϕL.subformula.val).max() > 5 * 1e-3: ϵ = 0.5 * (ϕU.subformula.val + ϕL.subformula.val) ϕ.subformula.val = ϵ r = ϕ.robustness(s) - ϕU.subformula.val = torch.where(torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5*1E-3, torch.where(r > 0, ϵ, ϕU.subformula.val), ϕU.subformula.val) - ϕL.subformula.val = torch.where(torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5*1E-3, torch.where(r <= 0, ϵ, ϕL.subformula.val), ϕL.subformula.val) + ϕU.subformula.val = torch.where( + torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5 * 1e-3, + torch.where(r > 0, ϵ, ϕU.subformula.val), + ϕU.subformula.val, + ) + ϕL.subformula.val = torch.where( + torch.abs(ϕU.subformula.val - ϕL.subformula.val) > 5 * 1e-3, + torch.where(r <= 0, ϵ, ϕL.subformula.val), + ϕL.subformula.val, + ) return ϵ.squeeze().detach().numpy() + def stlcg_overshoot(ys, save_filename="models/pstl_overshoot_stlcg.npy"): N = ys.shape[0] max_epoch = 1000 y = torch.as_tensor(ys).float().unsqueeze(-1) - s = stlcg.Expression('s', y) + s = stlcg.Expression("s", y) ϵ = torch.as_tensor(np.zeros([N, 1, 1])).float().requires_grad_(True) ϕ = stlcg.Always(subformula=(s < ϵ)) for epoch in range(max_epoch): @@ -146,7 +172,7 @@ def stlcg_overshoot(ys, save_filename="models/pstl_overshoot_stlcg.npy"): loss.backward() with torch.no_grad(): - ϵ -= 0.005* ϵ.grad + ϵ -= 0.005 * ϵ.grad if loss == 0: break ϵ.grad.zero_() @@ -160,18 +186,17 @@ def stlcg_gpu_settling(ys, save_filename="models/pstl_settling_stlcg_gpu.npy"): max_epoch = 1000 N = ys.shape[0] y = torch.as_tensor(ys).float().unsqueeze(-1).cuda() - s = stlcg.Expression('s', torch.abs(y - 1)) + s = stlcg.Expression("s", torch.abs(y - 1)) ϵ = torch.as_tensor(np.zeros([N, 1, 1])).float().cuda().requires_grad_(True) ϕ = stlcg.Always(subformula=(s < ϵ), interval=[50, 100]) - for epoch in range(max_epoch): loss = torch.relu(-ϕ.robustness(s).squeeze()).sum() loss.backward() with torch.no_grad(): - ϵ -= 0.005* ϵ.grad + ϵ -= 0.005 * ϵ.grad ϵ.grad.zero_() if loss == 0: @@ -180,11 +205,12 @@ def stlcg_gpu_settling(ys, save_filename="models/pstl_settling_stlcg_gpu.npy"): return ϵ.squeeze().cpu().detach().numpy() np.save(save_filename, ϵ.squeeze().cpu().detach().numpy()) + def stlcg_gpu_overshoot(ys, save_filename="models/pstl_overshoot_stlcg_gpu.npy"): N = ys.shape[0] max_epoch = 1000 y = torch.as_tensor(ys).float().unsqueeze(-1).cuda() - s = stlcg.Expression('s', y) + s = stlcg.Expression("s", y) ϵ = torch.as_tensor(np.zeros([N, 1, 1])).float().cuda().requires_grad_(True) ϕ = stlcg.Always(subformula=(s < ϵ)) for epoch in range(max_epoch): @@ -193,7 +219,7 @@ def stlcg_gpu_overshoot(ys, save_filename="models/pstl_overshoot_stlcg_gpu.npy") loss.backward() with torch.no_grad(): - ϵ -= 0.005* ϵ.grad + ϵ -= 0.005 * ϵ.grad if loss == 0: break ϵ.grad.zero_() @@ -203,14 +229,20 @@ def stlcg_gpu_overshoot(ys, save_filename="models/pstl_overshoot_stlcg_gpu.npy") np.save(save_filename, ϵ.squeeze().cpu().detach().numpy()) - if __name__ == "__main__": arguments = sys.argv[1:] # func_i, M, num M = 500 # batch size num = 1 # number of repetitions func_i = int(sys.argv[1]) - func_map = {1: stlcg_settling, 2: binary_search_settling, 3: stlcg_overshoot, 4: binary_search_overshoot, 5: stlcg_gpu_settling, 6: stlcg_gpu_overshoot} + func_map = { + 1: stlcg_settling, + 2: binary_search_settling, + 3: stlcg_overshoot, + 4: binary_search_overshoot, + 5: stlcg_gpu_settling, + 6: stlcg_gpu_overshoot, + } if len(arguments) == 2: M = int(sys.argv[2]) elif len(arguments) == 3: @@ -218,11 +250,9 @@ def stlcg_gpu_overshoot(ys, save_filename="models/pstl_overshoot_stlcg_gpu.npy") num = int(sys.argv[3]) # print(func_map[func_i].__name__, " M=%i num=%i"%(M,num)) - mat = scipy.io.loadmat('models/clustering_data.mat') + mat = scipy.io.loadmat("models/clustering_data.mat") - ys_ = np.concatenate([mat['ys'], mat['ys'], mat['ys']], axis=0)[:M,:] + ys_ = np.concatenate([mat["ys"], mat["ys"], mat["ys"]], axis=0)[:M, :] ys = np.zeros(ys_.shape) - ys[:,:] = np.fliplr(ys_) + ys[:, :] = np.fliplr(ys_) run_multiple_times(num, func_map[func_i], ys) - - diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a56680c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[project] +name = "stlcg" +version = "0.0.1" +authors = [ + { name = "Karen Leung", email = "karen.ym.leung@gmail.com" }, + { name = "Nikos Arechiga", email = "nikos.arechiga@tri.global" }, +] +description = "A toolbox to compute the robustness of STL formulas using computations graphs (PyTorch)." +readme = "README.md" + +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +repository = "https://github.com/StanfordASL/stlcg" + +[tool.setuptools] +packages = ["stlcg"] + +[tool.isort] +profile = "black" +line_length = 88 +skip_gitignore = false +group_by_package = true diff --git a/setup.py b/setup.py index 6421a80..e69de29 100644 --- a/setup.py +++ b/setup.py @@ -1,22 +0,0 @@ -import setuptools - -with open("README.md", "r") as fh: - long_description = fh.read() - -setuptools.setup( - name="stlcg", - version="0.0.1", - author="Karen Leung and Nikos Arechiga", - author_email="karen.ym.leung@gmail.com and nikos.arechiga@tri.global", - description="A toolbox to compute the robustness of STL formulas using computations graphs (PyTorch).", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/StanfordASL/stlcg", - packages=setuptools.find_packages(), - classifiers=[ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: MIT License", - "Operating System :: OS Independent", - ], -) - diff --git a/stlcg/logger.py b/stlcg/logger.py index 116794b..7b7bce1 100644 --- a/stlcg/logger.py +++ b/stlcg/logger.py @@ -1,9 +1,10 @@ -import tensorflow as tf import numpy as np -import scipy.misc +import scipy.misc +import tensorflow as tf + class Logger(object): - + def __init__(self, log_dir): """Create a summary writer logging to log_dir.""" self.writer = tf.summary.FileWriter(log_dir) @@ -39,4 +40,4 @@ def histo_summary(self, tag, values, step, bins=1000): # Create and write Summary summary = tf.Summary(value=[tf.Summary.Value(tag=tag, histo=hist)]) self.writer.add_summary(summary, step) - self.writer.flush() \ No newline at end of file + self.writer.flush() diff --git a/stlcg/stlcg.py b/stlcg/stlcg.py index c947ef9..4deb869 100644 --- a/stlcg/stlcg.py +++ b/stlcg/stlcg.py @@ -1,11 +1,10 @@ # -*- coding: utf-8 -*- -import torch import numpy as np -import os -import sys -import IPython +import torch + +# import IPython # from utils import tensor_to_str -''' +""" Important information: - This has the option to use an arithmetic-geometric mean robustness metric: https://arxiv.org/pdf/1903.05186.pdf. The default is not to use it. But this is still being tested. - Assume inputs are already reversed, but user does not need to worry about the indexing. @@ -13,7 +12,7 @@ - "scale" is the scale used in maxish and minish which Always, Eventually, Until, and Then uses. - "time" variable when computing robustness: time=0 means the current time, t=1 means next time step. The reversal of the trace is accounted for inside the function, the user does not need to worry about this - must specify subformula (no default string value) -''' +""" # TODO: @@ -22,12 +21,13 @@ # - Implement log-barrier # - option to choose type of padding -LARGE_NUMBER = 1E4 +LARGE_NUMBER = 1e4 + def tensor_to_str(tensor): - ''' + """ turn tensor into a string for printing - ''' + """ device = tensor.device.type req_grad = tensor.requires_grad if req_grad == False: @@ -37,6 +37,7 @@ def tensor_to_str(tensor): tensor = tensor.cpu() return str(tensor.numpy()) + def convert_to_input_values(inputs): x_, y_ = inputs if isinstance(x_, Expression): @@ -63,15 +64,16 @@ def convert_to_input_values(inputs): class Maxish(torch.nn.Module): - ''' + """ Function to compute the max, or softmax, or other variants of the max function. - ''' + """ + def __init__(self, name="Maxish input"): super(Maxish, self).__init__() self.input_name = name def forward(self, x, scale, dim=1, keepdim=True, agm=False, distributed=False): - ''' + """ x is of size [batch_size, T, ...] where T is typically the trace length. if scale <= 0, then the true max is used, otherwise, the softmax is used. @@ -83,54 +85,62 @@ def forward(self, x, scale, dim=1, keepdim=True, agm=False, distributed=False): agm is the arithmetic-geometric mean. Currently in progress. If some elements are >0, output is the average of those elements. If all the elements <= 0, output is -ᵐ√(Πᵢ (1 - ηᵢ)) + 1. scale doesn't play a role here except to switch between the using the AGM or true robustness value (scale <=0). Default: False distributed addresses the case when there are multiple max values. As max is poorly defined in these cases, PyTorch (randomly?) selects one of the max values only. If distributed=True and scale <=0 then it will average over the max values and split the gradients equally. Default: False - ''' + """ if isinstance(x, Expression): - assert x.value is not None, "Input Expression does not have numerical values" + assert ( + x.value is not None + ), "Input Expression does not have numerical values" x = x.value if scale > 0: if agm == True: if torch.gt(x, 0).any(): - return x[torch.gt(x, 0)].reshape(*x.shape[:-1], -1).mean(dim=dim, keepdim=keepdim) + return ( + x[torch.gt(x, 0)] + .reshape(*x.shape[:-1], -1) + .mean(dim=dim, keepdim=keepdim) + ) else: - return -torch.log(1-x).mean(dim=dim, keepdim=keepdim).exp() + 1 + return -torch.log(1 - x).mean(dim=dim, keepdim=keepdim).exp() + 1 else: # return torch.log(torch.exp(x*scale).sum(dim=dim, keepdim=keepdim))/scale - return torch.logsumexp(x * scale, dim=dim, keepdim=keepdim)/scale + return torch.logsumexp(x * scale, dim=dim, keepdim=keepdim) / scale else: if distributed: return self.distributed_true_max(x, dim=dim, keepdim=keepdim) else: return x.max(dim, keepdim=keepdim)[0] - @staticmethod def distributed_true_max(xx, dim=1, keepdim=True): - ''' + """ If there are multiple values that share the same max value, the max value is computed by taking the mean of all those values. This will ensure that the gradients of max(xx, dim=dim) will be distributed evenly across all values, rather than just the one value. - ''' + """ m, mi = torch.max(xx, dim, keepdim=True) inds = xx == m - return torch.where(inds, xx, xx*0).sum(dim, keepdim=keepdim) / inds.sum(dim, keepdim=keepdim) - + return torch.where(inds, xx, xx * 0).sum(dim, keepdim=keepdim) / inds.sum( + dim, keepdim=keepdim + ) def _next_function(self): - ''' + """ This is used for the graph visualization to keep track of the parent node. - ''' + """ return [str(self.input_name)] + class Minish(torch.nn.Module): - ''' + """ Function to compute the min, or softmin, or other variants of the min function. - ''' + """ + def __init__(self, name="Minish input"): super(Minish, self).__init__() self.input_name = name def forward(self, x, scale, dim=1, keepdim=True, agm=False, distributed=False): - ''' + """ x is of size [batch_size, T, ...] where T is typically the trace length. if scale <= 0, then the true max is used, otherwise, the softmax is used. @@ -142,22 +152,26 @@ def forward(self, x, scale, dim=1, keepdim=True, agm=False, distributed=False): agm is the arithmetic-geometric mean. Currently in progress. If all elements are >0, output is ᵐ√(Πᵢ (1 + ηᵢ)) - 1.If some the elements <= 0, output is the average of those negative values. scale doesn't play a role here except to switch between the using the AGM or true robustness value (scale <=0). distributed addresses the case when there are multiple max values. As max is poorly defined in these cases, PyTorch (randomly?) selects one of the max values only. If distributed=True and scale <=0 then it will average over the max values and split the gradients equally. Default: False - ''' + """ if isinstance(x, Expression): - assert x.value is not None, "Input Expression does not have numerical values" + assert ( + x.value is not None + ), "Input Expression does not have numerical values" x = x.value if scale > 0: if agm == True: if torch.gt(x, 0).all(): - return torch.log(1+x).mean(dim=dim, keepdim=keepdim).exp() - 1 + return torch.log(1 + x).mean(dim=dim, keepdim=keepdim).exp() - 1 else: # return x[torch.lt(x, 0)].reshape(*x.shape[:-1], -1).mean(dim=dim, keepdim=keepdim) - return (torch.lt(x,0) * x).sum(dim, keepdim=keepdim) / torch.lt(x, 0).sum(dim, keepdim=keepdim) + return (torch.lt(x, 0) * x).sum(dim, keepdim=keepdim) / torch.lt( + x, 0 + ).sum(dim, keepdim=keepdim) else: # return -torch.log(torch.exp(-x*scale).sum(dim=dim, keepdim=keepdim))/scale - return -torch.logsumexp(-x * scale, dim=dim, keepdim=keepdim)/scale + return -torch.logsumexp(-x * scale, dim=dim, keepdim=keepdim) / scale else: if distributed: @@ -167,25 +181,25 @@ def forward(self, x, scale, dim=1, keepdim=True, agm=False, distributed=False): @staticmethod def distributed_true_min(xx, dim=1, keepdim=True): - ''' + """ If there are multiple values that share the same max value, the min value is computed by taking the mean of all those values. This will ensure that the gradients of min(xx, dim=dim) will be distributed evenly across all values, rather than just the one value. - ''' + """ m, mi = torch.min(xx, dim, keepdim=True) inds = xx == m - return torch.where(inds, xx, xx*0).sum(dim, keepdim=keepdim) / inds.sum(dim, keepdim=keepdim) - + return torch.where(inds, xx, xx * 0).sum(dim, keepdim=keepdim) / inds.sum( + dim, keepdim=keepdim + ) def _next_function(self): - ''' + """ This is used for the graph visualization to keep track of the parent node. - ''' + """ return [str(self.input_name)] - class STL_Formula(torch.nn.Module): - ''' + """ NOTE: All the inputs are assumed to be TIME REVERSED. The outputs are also TIME REVERSED All STL formulas have the following functions: robustness_trace: Computes the robustness trace. @@ -201,46 +215,148 @@ class STL_Formula(torch.nn.Module): keepdim: Output shape is the same as the input tensor shapes. Default: True agm: Use arithmetic-geometric mean. (In progress.) Default: False distributed: Use the distributed mean. Default: False - ''' + """ def __init__(self): super(STL_Formula, self).__init__() - def robustness_trace(self, trace, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): + def robustness_trace( + self, + trace, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): raise NotImplementedError("robustness_trace not yet implemented") - def robustness(self, inputs, time=0, pscale=1, scale=-1, keepdim=True, agm=False,distributed=False, **kwargs): - ''' + def robustness( + self, + inputs, + time=0, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + """ Extracts the robustness_trace value at the given time. Default: time=0 assuming this is the index for the NON-REVERSED trace. But the code will take it from the end since the input signal is TIME REVERSED. - ''' - return self.forward(inputs, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs)[:,-(time+1),:].unsqueeze(1) - - def eval_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - ''' + """ + return self.forward( + inputs, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + )[:, -(time + 1), :].unsqueeze(1) + + def eval_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + """ The values in eval_trace are 0 or 1 (False or True) - ''' - return self.forward(inputs, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) > 0 - - def eval(self, inputs, time=0, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - ''' + """ + return ( + self.forward( + inputs, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) + > 0 + ) + + def eval( + self, + inputs, + time=0, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + """ Extracts the eval_trace value at the given time. Default: time=0 assuming this is the index for the NON-REVERSED trace. But the code will take it from the end since the input signal is TIME REVERSED. - ''' - return self.eval_trace(inputs, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs)[:,-(time+1),:].unsqueeze(1) # [batch_size, time_dim, x_dim] - - def forward(formula, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - ''' + """ + return self.eval_trace( + inputs, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + )[:, -(time + 1), :].unsqueeze( + 1 + ) # [batch_size, time_dim, x_dim] + + def forward( + formula, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + """ Evaluates the robustness_trace given the input. The input is converted to the numerical value first. - ''' + """ if isinstance(inputs, Expression): - assert inputs.value is not None, "Input Expression does not have numerical values" - return formula.robustness_trace(inputs.value, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) + assert ( + inputs.value is not None + ), "Input Expression does not have numerical values" + return formula.robustness_trace( + inputs.value, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) elif isinstance(inputs, torch.Tensor): - return formula.robustness_trace(inputs, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) + return formula.robustness_trace( + inputs, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) elif isinstance(inputs, tuple): - return formula.robustness_trace(convert_to_input_values(inputs), pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) + return formula.robustness_trace( + convert_to_input_values(inputs), + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) else: raise ValueError("Not a invalid input trace") @@ -258,8 +374,7 @@ def __invert__(phi): class Identity(STL_Formula): - - def __init__(self, name='x'): + def __init__(self, name="x"): super(Identity, self).__init__() self.name = name @@ -270,62 +385,67 @@ def _next_function(self): return [] def __str__(self): - return "%s" %self.name - - + return "%s" % self.name class Temporal_Operator(STL_Formula): - ''' + """ Class to compute Eventually and Always. This builds a recurrent cell to perform dynamic programming subformula: The formula that the temporal operator is applied to. interval: either None (defaults to [0, np.inf]), [a, b] ( b < np.inf), [a, np.inf] (a > 0) NOTE: Assume that the interval is describing the INDICES of the desired time interval. The user is responsible for converting the time interval (in time units) into indices (integers) using knowledge of the time step size. - ''' + """ + def __init__(self, subformula, interval=None): super(Temporal_Operator, self).__init__() self.subformula = subformula self.interval = interval self._interval = [0, np.inf] if self.interval is None else self.interval - self.rnn_dim = 1 if not self.interval else self.interval[-1] # rnn_dim=1 if interval is [0, ∞) otherwise rnn_dim=end of interval + self.rnn_dim = ( + 1 if not self.interval else self.interval[-1] + ) # rnn_dim=1 if interval is [0, ∞) otherwise rnn_dim=end of interval if self.rnn_dim == np.inf: self.rnn_dim = self.interval[0] - self.steps = 1 if not self.interval else self.interval[-1] - self.interval[0] + 1 # steps=1 if interval is [0, ∞) otherwise steps=length of interval + self.steps = ( + 1 if not self.interval else self.interval[-1] - self.interval[0] + 1 + ) # steps=1 if interval is [0, ∞) otherwise steps=length of interval self.operation = None # Matrices that shift a vector and add a new entry at the end. - self.M = torch.tensor(np.diag(np.ones(self.rnn_dim-1), k=1)).requires_grad_(False).float() + self.M = ( + torch.tensor(np.diag(np.ones(self.rnn_dim - 1), k=1)) + .requires_grad_(False) + .float() + ) self.b = torch.zeros(self.rnn_dim).unsqueeze(-1).requires_grad_(False).float() self.b[-1] = 1.0 def _initialize_rnn_cell(self, x): - - ''' + """ x is [batch_size, time_dim, x_dim] initial rnn state is [batch_size, rnn_dim, x_dim] This requires padding on the signal. Currently, the default is to extend the last value. TODO: have option on this padding The initial hidden state is of the form (hidden_state, count). count is needed just for the case with self.interval=[0, np.inf) and distributed=True. Since we are scanning through the sigal and outputing the min/max values incrementally, the distributed min function doesn't apply. If there are multiple min values along the signal, the gradient will be distributed equally across them. Otherwise it will only apply to the value that occurs earliest as we scan through the signal (i.e., practically, the last value in the trace as we process the signal backwards). - ''' + """ raise NotImplementedError("_initialize_rnn_cell is not implemented") - def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False,**kwargs): - ''' + def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): + """ x: rnn input [batch_size, 1, ...] h0: input rnn hidden state. The hidden state is either a tensor, or a tuple of tensors, depending on the interval chosen. Generally, the hidden state is of size [batch_size, rnn_dim,...] - ''' + """ raise NotImplementedError("_initialize_rnn_cell is not implemented") - def _run_cell(self, x, scale, agm=False, distributed=False): - ''' + """ Run the cell through the trace. - ''' + """ outputs = [] states = [] - hc = self._initialize_rnn_cell(x) # [batch_size, rnn_dim, x_dim] - xs = torch.split(x, 1, dim=1) # time_dim tuple + hc = self._initialize_rnn_cell(x) # [batch_size, rnn_dim, x_dim] + xs = torch.split(x, 1, dim=1) # time_dim tuple time_dim = len(xs) for i in range(time_dim): o, hc = self._rnn_cell(xs[i], hc, scale, agm=agm, distributed=distributed) @@ -333,19 +453,36 @@ def _run_cell(self, x, scale, agm=False, distributed=False): states.append(hc) return outputs, states - - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): # Compute the robustness trace of the subformula and that is the input to the temporal operator graph. - trace = self.subformula(inputs, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) - outputs, states = self._run_cell(trace, scale=scale, agm=agm, distributed=distributed) - return torch.cat(outputs, dim=1) # [batch_size, time_dim, ...] + trace = self.subformula( + inputs, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) + outputs, states = self._run_cell( + trace, scale=scale, agm=agm, distributed=distributed + ) + return torch.cat(outputs, dim=1) # [batch_size, time_dim, ...] def _next_function(self): # next function is the input subformula return [self.subformula] - class Always(Temporal_Operator): def __init__(self, subformula, interval=None): super(Always, self).__init__(subformula=subformula, interval=interval) @@ -353,26 +490,29 @@ def __init__(self, subformula, interval=None): self.oper = "min" def _initialize_rnn_cell(self, x): - ''' + """ Padding is with the last value of the trace - ''' + """ if x.is_cuda: self.M = self.M.cuda() self.b = self.b.cuda() - h0 = torch.ones([x.shape[0], self.rnn_dim, x.shape[2]], device=x.device)*x[:,:1,:] + h0 = ( + torch.ones([x.shape[0], self.rnn_dim, x.shape[2]], device=x.device) + * x[:, :1, :] + ) count = 0.0 # if self.interval is [a, np.inf), then the hidden state is a tuple (like in an LSTM) if (self._interval[1] == np.inf) & (self._interval[0] > 0): - d0 = x[:,:1,:] + d0 = x[:, :1, :] return ((d0, h0.to(x.device)), count) return (h0.to(x.device), count) def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): - ''' + """ x: rnn input [batch_size, 1, ...] hc=(h0, c) h0 is the input rnn hidden state [batch_size, rnn_dim, ...]. c is the count. Initialized by self._initialize_rnn_cell - ''' + """ h0, c = hc if self.operation is None: raise Exception() @@ -380,7 +520,7 @@ def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): if self.interval is None: if distributed: if x == h0: - new_h = (h0 * c + x)/ (c + 1) + new_h = (h0 * c + x) / (c + 1) new_c = c + 1.0 elif x < h0: new_h = x @@ -391,51 +531,61 @@ def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): state = (new_h, new_c) output = new_h else: - input_ = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] - output = self.operation(input_, scale, dim=1, keepdim=True, agm=agm) # [batch_size, 1, x_dim] + input_ = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] + output = self.operation( + input_, scale, dim=1, keepdim=True, agm=agm + ) # [batch_size, 1, x_dim] state = (output, None) - else: # self.interval is [a, np.inf) + else: # self.interval is [a, np.inf) if (self._interval[1] == np.inf) & (self._interval[0] > 0): d0, h0 = h0 - dh = torch.cat([d0, h0[:,:1,:]], dim=1) # [batch_size, 2, x_dim] - output = self.operation(dh, scale, dim=1, keepdim=True, agm=agm, distributed=distributed) # [batch_size, 1, x_dim] + dh = torch.cat([d0, h0[:, :1, :]], dim=1) # [batch_size, 2, x_dim] + output = self.operation( + dh, scale, dim=1, keepdim=True, agm=agm, distributed=distributed + ) # [batch_size, 1, x_dim] state = ((output, torch.matmul(self.M, h0) + self.b * x), None) - else: # self.interval is [a, b] + else: # self.interval is [a, b] state = (torch.matmul(self.M, h0) + self.b * x, None) - h0x = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] - input_ = h0x[:,:self.steps,:] # [batch_size, self.steps, x_dim] - output = self.operation(input_, scale, dim=1, keepdim=True, agm=agm, distributed=distributed) # [batch_size, 1, x_dim] + h0x = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] + input_ = h0x[:, : self.steps, :] # [batch_size, self.steps, x_dim] + output = self.operation( + input_, scale, dim=1, keepdim=True, agm=agm, distributed=distributed + ) # [batch_size, 1, x_dim] return output, state def __str__(self): return "◻ " + str(self._interval) + "( " + str(self.subformula) + " )" + class Eventually(Temporal_Operator): - def __init__(self, subformula='Eventually input', interval=None): + def __init__(self, subformula="Eventually input", interval=None): super(Eventually, self).__init__(subformula=subformula, interval=interval) self.operation = Maxish() self.oper = "max" def _initialize_rnn_cell(self, x): - ''' + """ Padding is with the last value of the trace - ''' + """ if x.is_cuda: self.M = self.M.cuda() self.b = self.b.cuda() - h0 = torch.ones([x.shape[0], self.rnn_dim, x.shape[2]], device=x.device)*x[:,:1,:] + h0 = ( + torch.ones([x.shape[0], self.rnn_dim, x.shape[2]], device=x.device) + * x[:, :1, :] + ) count = 0.0 if (self._interval[1] == np.inf) & (self._interval[0] > 0): - d0 = x[:,:1,:] + d0 = x[:, :1, :] return ((d0, h0.to(x.device)), count) return (h0.to(x.device), count) def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): - ''' + """ x: rnn input [batch_size, 1, ...] hc=(h0, c) h0 is the input rnn hidden state [batch_size, rnn_dim, ...]. c is the count. Initialized by self._initialize_rnn_cell - ''' + """ h0, c = hc if self.operation is None: raise Exception() @@ -443,7 +593,7 @@ def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): if self.interval is None: if distributed: if x == h0: - new_h = (h0 * c + x)/ (c + 1) + new_h = (h0 * c + x) / (c + 1) new_c = c + 1.0 elif x > h0: new_h = x @@ -454,50 +604,60 @@ def _rnn_cell(self, x, hc, scale=-1, agm=False, distributed=False, **kwargs): state = (new_h, new_c) output = new_h else: - input_ = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] - output = self.operation(input_, scale, dim=1, keepdim=True, agm=agm) # [batch_size, 1, x_dim] + input_ = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] + output = self.operation( + input_, scale, dim=1, keepdim=True, agm=agm + ) # [batch_size, 1, x_dim] state = (output, None) - else: # self.interval is [a, np.inf) + else: # self.interval is [a, np.inf) if (self._interval[1] == np.inf) & (self._interval[0] > 0): d0, h0 = h0 - dh = torch.cat([d0, h0[:,:1,:]], dim=1) # [batch_size, 2, x_dim] - output = self.operation(dh, scale, dim=1, keepdim=True, agm=agm, distributed=distributed) # [batch_size, 1, x_dim] + dh = torch.cat([d0, h0[:, :1, :]], dim=1) # [batch_size, 2, x_dim] + output = self.operation( + dh, scale, dim=1, keepdim=True, agm=agm, distributed=distributed + ) # [batch_size, 1, x_dim] state = ((output, torch.matmul(self.M, h0) + self.b * x), None) - else: # self.interval is [a, b] + else: # self.interval is [a, b] state = (torch.matmul(self.M, h0) + self.b * x, None) - h0x = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] - input_ = h0x[:,:self.steps,:] # [batch_size, self.steps, x_dim] - output = self.operation(input_, scale, dim=1, keepdim=True, agm=agm, distributed=distributed) # [batch_size, 1, x_dim] + h0x = torch.cat([h0, x], dim=1) # [batch_size, rnn_dim+1, x_dim] + input_ = h0x[:, : self.steps, :] # [batch_size, self.steps, x_dim] + output = self.operation( + input_, scale, dim=1, keepdim=True, agm=agm, distributed=distributed + ) # [batch_size, 1, x_dim] return output, state def __str__(self): return "♢ " + str(self._interval) + "( " + str(self.subformula) + " )" + class LessThan(STL_Formula): - ''' + """ lhs <= val where lhs is the signal, and val is the constant. lhs can be a string or an Expression val can be a float, int, Expression, or tensor. It cannot be a string. - ''' - def __init__(self, lhs='x', val='c'): + """ + + def __init__(self, lhs="x", val="c"): super(LessThan, self).__init__() - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of expression needs to be a string (input name) or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of expression needs to be a string (input name) or Expression" assert not isinstance(val, str), "value on the rhs cannot be a string" self.lhs = lhs self.val = val self.subformula = None def robustness_trace(self, trace, pscale=1.0, **kwargs): - ''' + """ Computing robustness trace. pscale scales the robustness by a constant. Default pscale=1. - ''' + """ if isinstance(trace, Expression): trace = trace.value if isinstance(self.val, Expression): - return (self.val.value - trace)*pscale + return (self.val.value - trace) * pscale else: - return (self.val - trace)*pscale + return (self.val - trace) * pscale def _next_function(self): # expects self.lhs to be a string (used for visualizing the graph) @@ -510,7 +670,9 @@ def __str__(self): if isinstance(self.lhs, Expression): lhs_str = self.lhs.name - if isinstance(self.val, str): # could be a string if robustness_trace is never called + if isinstance( + self.val, str + ): # could be a string if robustness_trace is never called return lhs_str + " <= " + self.val if isinstance(self.val, Expression): return lhs_str + " <= " + self.val.name @@ -521,30 +683,33 @@ def __str__(self): class GreaterThan(STL_Formula): - ''' + """ lhs >= val where lhs is the signal, and val is the constant. lhs can be a string or an Expression val can be a float, int, Expression, or tensor. It cannot be a string. - ''' - def __init__(self, lhs='x', val='c'): + """ + + def __init__(self, lhs="x", val="c"): super(GreaterThan, self).__init__() - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of expression needs to be a string (input name) or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of expression needs to be a string (input name) or Expression" assert not isinstance(val, str), "value on the rhs cannot be a string" self.lhs = lhs self.val = val self.subformula = None def robustness_trace(self, trace, pscale=1.0, **kwargs): - ''' + """ Computing robustness trace. pscale scales the robustness by a constant. Default pscale=1. - ''' + """ if isinstance(trace, Expression): trace = trace.value if isinstance(self.val, Expression): - return (trace - self.val.value)*pscale + return (trace - self.val.value) * pscale else: - return (trace - self.val)*pscale + return (trace - self.val) * pscale def _next_function(self): # expects self.lhs to be a string (used for visualizing the graph) @@ -557,7 +722,9 @@ def __str__(self): if isinstance(self.lhs, Expression): lhs_str = self.lhs.name - if isinstance(self.val, str): # could be a string if robustness_trace is never called + if isinstance( + self.val, str + ): # could be a string if robustness_trace is never called return lhs_str + " >= " + self.val if isinstance(self.val, Expression): return lhs_str + " >= " + self.val.name @@ -566,31 +733,35 @@ def __str__(self): # if self.value is a single number (e.g., int, or float) return lhs_str + " >= " + str(self.val) + class Equal(STL_Formula): - ''' + """ lhs == val where lhs is the signal, and val is the constant. lhs can be a string or an Expression val can be a float, int, Expression, or tensor. It cannot be a string. - ''' - def __init__(self, lhs='x', val='c'): + """ + + def __init__(self, lhs="x", val="c"): super(Equal, self).__init__() - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of expression needs to be a string (input name) or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of expression needs to be a string (input name) or Expression" assert not isinstance(val, str), "value on the rhs cannot be a string" self.lhs = lhs self.val = val self.subformula = None def robustness_trace(self, trace, pscale=1.0, **kwargs): - ''' + """ Computing robustness trace. pscale scales the robustness by a constant. Default pscale=1. - ''' + """ if isinstance(trace, Expression): trace = trace.value if isinstance(self.val, Expression): - return -torch.abs(trace - self.val.value)*pscale + return -torch.abs(trace - self.val.value) * pscale - return -torch.abs(trace - self.val)*pscale + return -torch.abs(trace - self.val) * pscale def _next_function(self): # if isinstance(self.lhs, Expression): @@ -602,7 +773,9 @@ def __str__(self): if isinstance(self.lhs, Expression): lhs_str = self.lhs.name - if isinstance(self.val, str): # could be a string if robustness_trace is never called + if isinstance( + self.val, str + ): # could be a string if robustness_trace is never called return lhs_str + " == " + self.val if isinstance(self.val, Expression): return lhs_str + " == " + self.val.name @@ -611,16 +784,27 @@ def __str__(self): # if self.value is a single number (e.g., int, or float) return lhs_str + " == " + str(self.val) + class Negation(STL_Formula): - ''' + """ not Subformula - ''' + """ + def __init__(self, subformula): super(Negation, self).__init__() self.subformula = subformula - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, distributed=False, **kwargs): - return -self.subformula(inputs, pscale=pscale, scale=scale, keepdim=keepdim, distributed=distributed, **kwargs) + def robustness_trace( + self, inputs, pscale=1, scale=-1, keepdim=True, distributed=False, **kwargs + ): + return -self.subformula( + inputs, + pscale=pscale, + scale=scale, + keepdim=keepdim, + distributed=distributed, + **kwargs, + ) def _next_function(self): # next function is actually input (traverses the graph backwards) @@ -629,23 +813,51 @@ def _next_function(self): def __str__(self): return "¬(" + str(self.subformula) + ")" + class Implies(STL_Formula): - ''' + """ Implies - ''' + """ + def __init__(self, subformula1, subformula2): super(Implies, self).__init__() self.subformula1 = subformula1 self.subformula2 = subformula2 self.operation = Maxish() - - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): x, y = inputs - trace1 = self.subformula1(x, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) - trace2 = self.subformula2(y, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) - xx = torch.stack([-trace1, trace2], dim=-1) # [batch_size, time_dim, ..., 2] - return self.operation(xx, scale, dim=-1, keepdim=False, agm=agm, distributed=distributed) # [batch_size, time_dim, ...] + trace1 = self.subformula1( + x, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) + trace2 = self.subformula2( + y, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) + xx = torch.stack([-trace1, trace2], dim=-1) # [batch_size, time_dim, ..., 2] + return self.operation( + xx, scale, dim=-1, keepdim=False, agm=agm, distributed=distributed + ) # [batch_size, time_dim, ...] def _next_function(self): # next function is actually input (traverses the graph backwards) @@ -654,11 +866,13 @@ def _next_function(self): def __str__(self): return "(" + str(self.subformula1) + ") => (" + str(self.subformula2) + ")" + class And(STL_Formula): - ''' + """ inputs: tuple (x,y) where x and y are the inputs to each subformula respectively. x or y can also be a tuple if the subformula requires multiple inputs (e.g, ϕ₁(x) ∧ (ϕ₂(y) ∧ ϕ₃(z)) would have inputs=(x, (y,z))) trace1 and trace2 are size [batch_size, time_dim, x_dim] - ''' + """ + def __init__(self, subformula1, subformula2): super(And, self).__init__() self.subformula1 = subformula1 @@ -666,15 +880,91 @@ def __init__(self, subformula1, subformula2): self.operation = Minish() @staticmethod - def separate_and(formula, input_, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): + def separate_and( + formula, + input_, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): if formula.__class__.__name__ != "And": - return formula(input_, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs).unsqueeze(-1) + return formula( + input_, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ).unsqueeze(-1) else: - return torch.cat([And.separate_and(formula.subformula1, input_[0], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs), And.separate_and(formula.subformula2, input_[1], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs)], axis=-1) - - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - xx = torch.cat([And.separate_and(self.subformula1, inputs[0], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs), And.separate_and(self.subformula2, inputs[1], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs)], axis=-1) - return self.operation(xx, scale, dim=-1, keepdim=False, agm=agm, distributed=distributed) # [batch_size, time_dim, ...] + return torch.cat( + [ + And.separate_and( + formula.subformula1, + input_[0], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + And.separate_and( + formula.subformula2, + input_[1], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + ], + axis=-1, + ) + + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + xx = torch.cat( + [ + And.separate_and( + self.subformula1, + inputs[0], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + And.separate_and( + self.subformula2, + inputs[1], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + ], + axis=-1, + ) + return self.operation( + xx, scale, dim=-1, keepdim=False, agm=agm, distributed=distributed + ) # [batch_size, time_dim, ...] def _next_function(self): # next function is actually input (traverses the graph backwards) @@ -683,11 +973,13 @@ def _next_function(self): def __str__(self): return "(" + str(self.subformula1) + ") ∧ (" + str(self.subformula2) + ")" + class Or(STL_Formula): - ''' + """ inputs: tuple (x,y) where x and y are the inputs to each subformula respectively. x or y can also be a tuple if the subformula requires multiple inputs (e.g, ϕ₁(x) ∨ (ϕ₂(y) ∨ ϕ₃(z)) would have inputs=(x, (y,z))) trace1 and trace2 are size [batch_size, time_dim, x_dim] - ''' + """ + def __init__(self, subformula1, subformula2): super(Or, self).__init__() self.subformula1 = subformula1 @@ -695,15 +987,91 @@ def __init__(self, subformula1, subformula2): self.operation = Maxish() @staticmethod - def separate_or(formula, input_, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): + def separate_or( + formula, + input_, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): if formula.__class__.__name__ != "Or": - return formula(input_, pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs).unsqueeze(-1) + return formula( + input_, + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ).unsqueeze(-1) else: - return torch.cat([Or.separate_or(formula.subformula1, input_[0], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs), Or.separate_or(formula.subformula2, input_[1], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs)], axis=-1) - - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - xx = torch.cat([Or.separate_or(self.subformula1, inputs[0], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs), Or.separate_or(self.subformula2, inputs[1], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs)], axis=-1) - return self.operation(xx, scale, dim=-1, keepdim=False, agm=agm, distributed=distributed) # [batch_size, time_dim, ...] + return torch.cat( + [ + Or.separate_or( + formula.subformula1, + input_[0], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + Or.separate_or( + formula.subformula2, + input_[1], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + ], + axis=-1, + ) + + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + xx = torch.cat( + [ + Or.separate_or( + self.subformula1, + inputs[0], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + Or.separate_or( + self.subformula2, + inputs[1], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ), + ], + axis=-1, + ) + return self.operation( + xx, scale, dim=-1, keepdim=False, agm=agm, distributed=distributed + ) # [batch_size, time_dim, ...] def _next_function(self): # next function is actually input (traverses the graph backwards) @@ -712,137 +1080,322 @@ def _next_function(self): def __str__(self): return "(" + str(self.subformula1) + ") ∨ (" + str(self.subformula2) + ")" + class Until(STL_Formula): - def __init__(self, subformula1="Until subformula1", subformula2="Until subformula2", interval=None, overlap=True): - ''' + def __init__( + self, + subformula1="Until subformula1", + subformula2="Until subformula2", + interval=None, + overlap=True, + ): + """ subformula1 U subformula2 (ϕ U ψ) This assumes that ϕ is always true before ψ becomes true. If overlap=True, then the last time step that ϕ is true, ψ starts being true. That is, sₜ ⊧ ϕ and sₜ ⊧ ψ. If overlap=False, when ϕ stops being true, ψ starts being true. That is sₜ ⊧ ϕ and sₜ+₁ ⊧ ψ, but sₜ ¬⊧ ψ - ''' + """ super(Until, self).__init__() self.subformula1 = subformula1 self.subformula2 = subformula2 self.interval = interval if overlap == False: - self.subformula2 = Eventually(subformula=subformula2, interval=[0,1]) - - - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - ''' + self.subformula2 = Eventually(subformula=subformula2, interval=[0, 1]) + + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + """ trace1 is the robustness trace of ϕ trace2 is the robustness trace of ψ trace1 and trace2 are size [batch_size, time_dim, x_dim] - ''' - assert isinstance(self.subformula1, STL_Formula), "Subformula1 needs to be an stl formula" - assert isinstance(self.subformula2, STL_Formula), "Subformula2 needs to be an stl formula" - LARGE_NUMBER = 1E6 + """ + assert isinstance( + self.subformula1, STL_Formula + ), "Subformula1 needs to be an stl formula" + assert isinstance( + self.subformula2, STL_Formula + ), "Subformula2 needs to be an stl formula" + LARGE_NUMBER = 1e6 interval = self.interval - trace1 = self.subformula1(inputs[0], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) - trace2 = self.subformula2(inputs[1], pscale=pscale, scale=scale, keepdim=keepdim, agm=agm, distributed=distributed, **kwargs) + trace1 = self.subformula1( + inputs[0], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) + trace2 = self.subformula2( + inputs[1], + pscale=pscale, + scale=scale, + keepdim=keepdim, + agm=agm, + distributed=distributed, + **kwargs, + ) Alw = Always(subformula=Identity(name=str(self.subformula1))) minish = Minish() maxish = Maxish() - LHS = trace2.unsqueeze(-1).repeat([1, 1, 1,trace2.shape[1]]).permute(0, 3, 2, 1) + LHS = ( + trace2.unsqueeze(-1).repeat([1, 1, 1, trace2.shape[1]]).permute(0, 3, 2, 1) + ) if interval == None: - RHS = torch.ones_like(LHS)*-LARGE_NUMBER + RHS = torch.ones_like(LHS) * -LARGE_NUMBER for i in range(trace2.shape[1]): - RHS[:,i:,:,i] = Alw(trace1[:,i:,:]) + RHS[:, i:, :, i] = Alw(trace1[:, i:, :]) return maxish( - minish(torch.stack([LHS, RHS], dim=-1), scale=scale, dim=-1, keepdim=False, agm=agm, distributed=distributed), - scale=scale, dim=-1, keepdim=False, agm=agm, distributed=distributed) + minish( + torch.stack([LHS, RHS], dim=-1), + scale=scale, + dim=-1, + keepdim=False, + agm=agm, + distributed=distributed, + ), + scale=scale, + dim=-1, + keepdim=False, + agm=agm, + distributed=distributed, + ) elif interval[1] < np.Inf: # [a, b] where b < ∞ a = int(interval[0]) b = int(interval[1]) - RHS = [torch.ones_like(trace1)[:,:b,:] * -LARGE_NUMBER] - for i in range(b,trace2.shape[1]): - A = trace2[:,i-b:i-a+1,:].unsqueeze(-1) - relevant = trace1[:,:i+1,:] - B = Alw(relevant.flip(1), scale=scale, keepdim=keepdim, distributed=distributed)[:,a:b+1,:].flip(1).unsqueeze(-1) - RHS.append(maxish(minish(torch.cat([A,B], dim=-1), dim=-1, scale=scale, keepdim=False, distributed=distributed), dim=1, scale=scale, keepdim=keepdim, distributed=distributed)) - return torch.cat(RHS, dim=1); + RHS = [torch.ones_like(trace1)[:, :b, :] * -LARGE_NUMBER] + for i in range(b, trace2.shape[1]): + A = trace2[:, i - b : i - a + 1, :].unsqueeze(-1) + relevant = trace1[:, : i + 1, :] + B = ( + Alw( + relevant.flip(1), + scale=scale, + keepdim=keepdim, + distributed=distributed, + )[:, a : b + 1, :] + .flip(1) + .unsqueeze(-1) + ) + RHS.append( + maxish( + minish( + torch.cat([A, B], dim=-1), + dim=-1, + scale=scale, + keepdim=False, + distributed=distributed, + ), + dim=1, + scale=scale, + keepdim=keepdim, + distributed=distributed, + ) + ) + return torch.cat(RHS, dim=1) else: - a = int(interval[0]) # [a, ∞] where a < ∞ - RHS = [torch.ones_like(trace1)[:,:a,:] * -LARGE_NUMBER] - for i in range(a,trace2.shape[1]): - A = trace2[:,:i-a+1,:].unsqueeze(-1) - relevant = trace1[:,:i+1,:] - B = Alw(relevant.flip(1), scale=scale, keepdim=keepdim, distributed=distributed)[:,a:,:].flip(1).unsqueeze(-1) - RHS.append(maxish(minish(torch.cat([A,B], dim=-1), dim=-1, scale=scale, keepdim=False, distributed=distributed), dim=1, scale=scale, keepdim=keepdim, distributed=distributed)) - return torch.cat(RHS, dim=1); + a = int(interval[0]) # [a, ∞] where a < ∞ + RHS = [torch.ones_like(trace1)[:, :a, :] * -LARGE_NUMBER] + for i in range(a, trace2.shape[1]): + A = trace2[:, : i - a + 1, :].unsqueeze(-1) + relevant = trace1[:, : i + 1, :] + B = ( + Alw( + relevant.flip(1), + scale=scale, + keepdim=keepdim, + distributed=distributed, + )[:, a:, :] + .flip(1) + .unsqueeze(-1) + ) + RHS.append( + maxish( + minish( + torch.cat([A, B], dim=-1), + dim=-1, + scale=scale, + keepdim=False, + distributed=distributed, + ), + dim=1, + scale=scale, + keepdim=keepdim, + distributed=distributed, + ) + ) + return torch.cat(RHS, dim=1) def _next_function(self): # next function is actually input (traverses the graph backwards) return [self.subformula1, self.subformula2] def __str__(self): - return "(" + str(self.subformula1) + ")" + " U " + "(" + str(self.subformula2) + ")" + return ( + "(" + + str(self.subformula1) + + ")" + + " U " + + "(" + + str(self.subformula2) + + ")" + ) -class Then(STL_Formula): +class Then(STL_Formula): def __init__(self, subformula1, subformula2, interval=None, overlap=True): - ''' + """ subformula1 T subformula2 (ϕ U ψ) This assumes that ϕ is eventually true before ψ becomes true. If overlap=True, then the last time step that ϕ is true, ψ starts being true. That is, sₜ ⊧ ϕ and sₜ ⊧ ψ. If overlap=False, when ϕ stops being true, ψ starts being true. That is sₜ ⊧ ϕ and sₜ+₁ ⊧ ψ, but sₜ ¬⊧ ψ - ''' + """ super(Then, self).__init__() self.subformula1 = subformula1 self.subformula2 = subformula2 self.interval = interval if overlap == False: - self.subformula2 = Eventually(subformula=subformula2, interval=[0,1]) - - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=True, agm=False, distributed=False, **kwargs): - ''' + self.subformula2 = Eventually(subformula=subformula2, interval=[0, 1]) + + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=True, + agm=False, + distributed=False, + **kwargs, + ): + """ trace1 is the robustness trace of ϕ trace2 is the robustness trace of ψ trace1 and trace2 are size [batch_size, time_dim, x_dim] - ''' - assert isinstance(self.subformula1, STL_Formula), "Subformula1 needs to be an stl formula" - assert isinstance(self.subformula2, STL_Formula), "Subformula2 needs to be an stl formula" - LARGE_NUMBER = 1E6 + """ + assert isinstance( + self.subformula1, STL_Formula + ), "Subformula1 needs to be an stl formula" + assert isinstance( + self.subformula2, STL_Formula + ), "Subformula2 needs to be an stl formula" + LARGE_NUMBER = 1e6 interval = self.interval trace1 = self.subformula1(inputs[0]) trace2 = self.subformula2(inputs[1]) Ev = Eventually(subformula=Identity(name=str(self.subformula1))) minish = Minish() maxish = Maxish() - LHS = trace2.unsqueeze(-1).repeat([1, 1, 1,trace2.shape[1]]).permute(0, 3, 2, 1) - RHS = torch.ones_like(LHS)*-LARGE_NUMBER + LHS = ( + trace2.unsqueeze(-1).repeat([1, 1, 1, trace2.shape[1]]).permute(0, 3, 2, 1) + ) + RHS = torch.ones_like(LHS) * -LARGE_NUMBER if interval == None: for i in range(trace2.shape[1]): - RHS[:,i:,:,i] = Ev(trace1[:,i:,:]) + RHS[:, i:, :, i] = Ev(trace1[:, i:, :]) return maxish( - minish(torch.stack([LHS, RHS], dim=-1), scale=scale, dim=-1, keepdim=False, agm=agm, distributed=distributed), - scale=scale, dim=-1, keepdim=False, agm=agm, distributed=distributed) + minish( + torch.stack([LHS, RHS], dim=-1), + scale=scale, + dim=-1, + keepdim=False, + agm=agm, + distributed=distributed, + ), + scale=scale, + dim=-1, + keepdim=False, + agm=agm, + distributed=distributed, + ) elif interval[1] < np.Inf: # [a, b] where b < ∞ a = int(interval[0]) b = int(interval[1]) - RHS = [torch.ones_like(trace1)[:,:b,:] * -LARGE_NUMBER] - for i in range(b,trace2.shape[1]): - A = trace2[:,i-b:i-a+1,:].unsqueeze(-1) - relevant = trace1[:,:i+1,:] - B = Ev(relevant.flip(1), scale=scale, keepdim=keepdim, distributed=distributed)[:,a:b+1,:].flip(1).unsqueeze(-1) - RHS.append(maxish(minish(torch.cat([A,B], dim=-1), dim=-1, scale=scale, keepdim=False, distributed=distributed), dim=1, scale=scale, keepdim=keepdim, distributed=distributed)) - return torch.cat(RHS, dim=1); + RHS = [torch.ones_like(trace1)[:, :b, :] * -LARGE_NUMBER] + for i in range(b, trace2.shape[1]): + A = trace2[:, i - b : i - a + 1, :].unsqueeze(-1) + relevant = trace1[:, : i + 1, :] + B = ( + Ev( + relevant.flip(1), + scale=scale, + keepdim=keepdim, + distributed=distributed, + )[:, a : b + 1, :] + .flip(1) + .unsqueeze(-1) + ) + RHS.append( + maxish( + minish( + torch.cat([A, B], dim=-1), + dim=-1, + scale=scale, + keepdim=False, + distributed=distributed, + ), + dim=1, + scale=scale, + keepdim=keepdim, + distributed=distributed, + ) + ) + return torch.cat(RHS, dim=1) else: - a = int(interval[0]) # [a, ∞] where a < ∞ - RHS = [torch.ones_like(trace1)[:,:a,:] * -LARGE_NUMBER] - for i in range(a,trace2.shape[1]): - A = trace2[:,:i-a+1,:].unsqueeze(-1) - relevant = trace1[:,:i+1,:] - B = Ev(relevant.flip(1), scale=scale, keepdim=keepdim, distributed=distributed)[:,a:,:].flip(1).unsqueeze(-1) - RHS.append(maxish(minish(torch.cat([A,B], dim=-1), dim=-1, scale=scale, keepdim=False, distributed=distributed), dim=1, scale=scale, keepdim=keepdim, distributed=distributed)) - return torch.cat(RHS, dim=1); # [batch_size, time_dim, x_dim] + a = int(interval[0]) # [a, ∞] where a < ∞ + RHS = [torch.ones_like(trace1)[:, :a, :] * -LARGE_NUMBER] + for i in range(a, trace2.shape[1]): + A = trace2[:, : i - a + 1, :].unsqueeze(-1) + relevant = trace1[:, : i + 1, :] + B = ( + Ev( + relevant.flip(1), + scale=scale, + keepdim=keepdim, + distributed=distributed, + )[:, a:, :] + .flip(1) + .unsqueeze(-1) + ) + RHS.append( + maxish( + minish( + torch.cat([A, B], dim=-1), + dim=-1, + scale=scale, + keepdim=False, + distributed=distributed, + ), + dim=1, + scale=scale, + keepdim=keepdim, + distributed=distributed, + ) + ) + return torch.cat(RHS, dim=1) # [batch_size, time_dim, x_dim] def _next_function(self): # next function is actually input (traverses the graph backwards) return [self.subformula1, self.subformula2] def __str__(self): - return "(" + str(self.subformula1) + ")" + " T " + "(" + str(self.subformula2) + ")" + return ( + "(" + + str(self.subformula1) + + ")" + + " T " + + "(" + + str(self.subformula2) + + ")" + ) + class Integral1d(STL_Formula): def __init__(self, subformula, interval=None): @@ -857,44 +1410,63 @@ def __init__(self, subformula, interval=None): param.requires_grad = False self.conv.weight /= self.conv.weight - - def construct_padding(self, padding_type, custom_number=100): if self.padding_size is not None: if padding_type == "zero": return torch.zeros([1, self.padding_size, 1]) elif padding_type == "custom": - return torch.ones([1, self.padding_size, 1])*custom_number + return torch.ones([1, self.padding_size, 1]) * custom_number elif padding_type == "same": return torch.ones([1, self.padding_size, 1]) else: return None - def robustness_trace(self, inputs, pscale=1, scale=-1, keepdim=False, use_relu=False, padding_type="same", custom_number=100, integration_scheme="riemann", **kwargs): - - subformula_trace = self.subformula(inputs, pscale=pscale, scale=scale, keepdim=keepdim, **kwargs) - + def robustness_trace( + self, + inputs, + pscale=1, + scale=-1, + keepdim=False, + use_relu=False, + padding_type="same", + custom_number=100, + integration_scheme="riemann", + **kwargs, + ): + subformula_trace = self.subformula( + inputs, pscale=pscale, scale=scale, keepdim=keepdim, **kwargs + ) if self.interval is not None: if integration_scheme == "trapz": - self.conv.weight[:,:,0] /= 2 - self.conv.weight[:,:,-1] /= 2 + self.conv.weight[:, :, 0] /= 2 + self.conv.weight[:, :, -1] /= 2 padding = self.construct_padding(padding_type, custom_number) if subformula_trace.is_cuda: padding = padding.cuda() if padding_type == "same": - padding = padding * subformula_trace[:,0,:] - signal = torch.cat([padding, subformula_trace], dim=1).transpose(1,2) + padding = padding * subformula_trace[:, 0, :] + signal = torch.cat([padding, subformula_trace], dim=1).transpose(1, 2) if use_relu: - return self.conv(torch.relu(signal)).transpose(1,2)[:,:subformula_trace.shape[1],:] + return self.conv(torch.relu(signal)).transpose(1, 2)[ + :, : subformula_trace.shape[1], : + ] else: - return self.conv(signal).transpose(1,2)[:,:subformula_trace.shape[1],:] + return self.conv(signal).transpose(1, 2)[ + :, : subformula_trace.shape[1], : + ] else: if integration_scheme == "trapz": - pad = torch.zeros_like(subformula_trace[:,:1,:]) + pad = torch.zeros_like(subformula_trace[:, :1, :]) if subformula_trace.is_cuda: pad = padding.cuda() - signal = torch.cat([pad, (subformula_trace[:,:-1,:] + subformula_trace[:,1:,:])/2], dim=1) + signal = torch.cat( + [ + pad, + (subformula_trace[:, :-1, :] + subformula_trace[:, 1:, :]) / 2, + ], + dim=1, + ) else: signal = subformula_trace if use_relu == True: @@ -910,13 +1482,13 @@ def __str__(self): return "I" + str(self.interval) + "(" + str(self.subformula) + ")" - class Expression(torch.nn.Module): - ''' + """ Wraps a pytorch arithmetic operation, so that we can intercept and overload comparison operators. Expression allows us to express tensors using their names to make it easier to code up and read, but also keep track of their numeric values. - ''' + """ + def __init__(self, name, value): super(Expression, self).__init__() self.name = name @@ -933,7 +1505,7 @@ def __neg__(self): def __add__(self, other): if isinstance(other, Expression): - return Expression(self.name + '+' + other.name, self.value + other.value) + return Expression(self.name + "+" + other.name, self.value + other.value) else: return Expression(self.name + "+other", self.value + other) @@ -944,11 +1516,10 @@ def __radd__(self, other): def __sub__(self, other): if isinstance(other, Expression): - return Expression(self.name + '-' + other.name, self.value - other.value) + return Expression(self.name + "-" + other.name, self.value - other.value) else: return Expression(self.name + "-other", self.value - other) - def __rsub__(self, other): return Expression(other - self.value) # No need for the case when "other" is an Expression, since that @@ -956,11 +1527,10 @@ def __rsub__(self, other): def __mul__(self, other): if isinstance(other, Expression): - return Expression(self.name + '*' + other.name, self.value * other.value) + return Expression(self.name + "*" + other.name, self.value * other.value) else: return Expression(self.name + "*other", self.value * other) - def __rmul__(self, other): return self.__mul__(other) @@ -968,39 +1538,49 @@ def __truediv__(a, b): # This is the new form required by Python 3 numerator = a denominator = b - num_name = 'num' - denom_name = 'denom' + num_name = "num" + denom_name = "denom" if isinstance(numerator, Expression): num_name = numerator.name numerator = numerator.value if isinstance(denominator, Expression): denom_name = denominator.name denominator = denominator.value - return Expression(num_name + '/' + denom_name, numerator/denominator) + return Expression(num_name + "/" + denom_name, numerator / denominator) # Comparators def __lt__(lhs, rhs): - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of LessThan needs to be a string or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of LessThan needs to be a string or Expression" assert not isinstance(rhs, str), "RHS cannot be a string" return LessThan(lhs, rhs) def __le__(lhs, rhs): - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of LessThan needs to be a string or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of LessThan needs to be a string or Expression" assert not isinstance(rhs, str), "RHS cannot be a string" return LessThan(lhs, rhs) def __gt__(lhs, rhs): - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of GreaterThan needs to be a string or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of GreaterThan needs to be a string or Expression" assert not isinstance(rhs, str), "RHS cannot be a string" return GreaterThan(lhs, rhs) def __ge__(lhs, rhs): - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of GreaterThan needs to be a string or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of GreaterThan needs to be a string or Expression" assert not isinstance(rhs, str), "RHS cannot be a string" return GreaterThan(lhs, rhs) def __eq__(lhs, rhs): - assert isinstance(lhs, str) | isinstance(lhs, Expression), "LHS of Equal needs to be a string or Expression" + assert isinstance(lhs, str) | isinstance( + lhs, Expression + ), "LHS of Equal needs to be a string or Expression" assert not isinstance(rhs, str), "RHS cannot be a string" return Equal(lhs, rhs) @@ -1008,4 +1588,4 @@ def __ne__(lhs, rhs): raise NotImplementedError("Not supported yet") def __str__(self): - return str(self.name) \ No newline at end of file + return str(self.name) diff --git a/stlcg/stlviz.py b/stlcg/stlviz.py index 0ea0e43..f24fa8d 100644 --- a/stlcg/stlviz.py +++ b/stlcg/stlviz.py @@ -1,21 +1,29 @@ from collections import namedtuple from distutils.version import LooseVersion -from graphviz import Digraph + +import IPython import torch +from graphviz import Digraph from torch.autograd import Variable -from stlcg import Expression, STL_Formula -import IPython -Node = namedtuple('Node', ('name', 'inputs', 'attr', 'op')) +from stlcg import Expression, STL_Formula -def make_stl_graph(form, node_attr=dict(style='filled', - shape='box', - align='left', - fontsize='12', - ranksep='0.1', - height='0.2'), - graph_attr=dict(size="12,12")): - """ Produces Graphviz representation of PyTorch autograd graph. +Node = namedtuple("Node", ("name", "inputs", "attr", "op")) + + +def make_stl_graph( + form, + node_attr=dict( + style="filled", + shape="box", + align="left", + fontsize="12", + ranksep="0.1", + height="0.2", + ), + graph_attr=dict(size="12,12"), +): + """Produces Graphviz representation of PyTorch autograd graph. Blue nodes are the Variables that require grad, orange are Tensors saved for backward in torch.autograd.Function Args: @@ -34,7 +42,7 @@ def make_stl_graph(form, node_attr=dict(style='filled', seen = set() def size_to_str(size): - return '(' + (', ').join(['%d' % v for v in size]) + ')' + return "(" + (", ").join(["%d" % v for v in size]) + ")" def tensor_to_str(tensor): device = tensor.device.type @@ -61,12 +69,14 @@ def add_nodes(form): elif type(form) == str: dot.node(str(id(form)), form, fillcolor="lightcoral") elif isinstance(form, STL_Formula): - dot.node(str(id(form)), form._get_name() + "\n" + str(form), fillcolor="orange") + dot.node( + str(id(form)), form._get_name() + "\n" + str(form), fillcolor="orange" + ) else: dot.node(str(id(form)), str(form), fillcolor="lightcoral") # recursive call to all the components of the formula - if hasattr(form, '_next_function'): + if hasattr(form, "_next_function"): for u in form._next_function(): dot.edge(str(id(u)), str(id(form))) add_nodes(u) @@ -97,7 +107,6 @@ def add_nodes(form): # dot.edge(str(id(t)), str(id(var))) # add_nodes(t) - # handle multiple outputs if isinstance(form, tuple): for v in form: @@ -122,5 +131,5 @@ def resize_graph(dot, size_per_element=0.15, min_size=12): dot.graph_attr.update(size=size_str) -def save_graph(dot, filename, format='pdf', cleanup=True): - dot.render(filename=filename, format=format, cleanup=cleanup) \ No newline at end of file +def save_graph(dot, filename, format="pdf", cleanup=True): + dot.render(filename=filename, format=format, cleanup=cleanup) diff --git a/stlcg/utils.py b/stlcg/utils.py index 7a34c4d..043bac0 100644 --- a/stlcg/utils.py +++ b/stlcg/utils.py @@ -1,29 +1,34 @@ # -*- coding: utf-8 -*- -import torch import numpy as np +import torch + +LARGE_NUMBER = 1e4 -LARGE_NUMBER = 1E4 def bump(input_tensor, left, right, slope): - ''' + """ creating the bump function σ(s(x-a))(1 - σ(s(x - b))) - ''' - mask = (torch.sigmoid(slope*(input_tensor - left))*(1 - torch.sigmoid(slope*(input_tensor - right)))) - return mask/torch.max(mask) + """ + mask = torch.sigmoid(slope * (input_tensor - left)) * ( + 1 - torch.sigmoid(slope * (input_tensor - right)) + ) + return mask / torch.max(mask) + def bump_transform(oper, input_tensor, mask, scale=1, large_num=LARGE_NUMBER): - ''' + """ non-masked numbers will be a large positive (or negative) number, so we can take the min (or max) properly - ''' + """ sign = 1 if oper == "min" else -1 - sgn = torch.sign(input_tensor) if scale <= 0.0 else torch.tanh(input_tensor*scale) + sgn = torch.sign(input_tensor) if scale <= 0.0 else torch.tanh(input_tensor * scale) return (large_num * sign * (1 - mask) * sgn + mask) * input_tensor + def tensor_to_str(tensor): - ''' + """ turn tensor into a string for printing - ''' + """ device = tensor.device.type req_grad = tensor.requires_grad if req_grad == False: @@ -33,10 +38,11 @@ def tensor_to_str(tensor): tensor = tensor.cpu() return str(tensor.numpy()) + def print_learning_progress(formula, inputs, var_dict, i, loss, scale): vals = [i, loss] string = "iteration: %i -- loss: %.3f" - for (k,v) in var_dict.items(): + for k, v in var_dict.items(): string += " ---- %s:%.3f" vals.append(k) vals.append(v) @@ -44,4 +50,4 @@ def print_learning_progress(formula, inputs, var_dict, i, loss, scale): vals.append(scale) string += " ---- true value:%.3f" vals.append(formula.robustness(inputs).detach().numpy()) - print(string%tuple(vals)) \ No newline at end of file + print(string % tuple(vals)) From 6d5c4c97acf8fcfd81fc2decb7dfb9ffd4c7d52b Mon Sep 17 00:00:00 2001 From: Anand Balakrishnan Date: Mon, 19 Feb 2024 04:23:57 -0800 Subject: [PATCH 3/4] refactor: move stlcg accompanied modules into package --- demo.ipynb | 4 ++-- examples/VAE.ipynb | 2 +- stlcg/__init__.py | 41 +++++++++++++++++++++++++++++++++++++++++ stlcg/stlcg.py | 2 +- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/demo.ipynb b/demo.ipynb index c1c7409..f6a766e 100644 --- a/demo.ipynb +++ b/demo.ipynb @@ -18,9 +18,9 @@ "outputs": [], "source": [ "import stlcg\n", - "import stlviz as viz\n", + "import stlcg.stlviz as viz\n", "from stlcg import Expression\n", - "from utils import print_learning_progress" + "from stlcg.utils import print_learning_progress" ] }, { diff --git a/examples/VAE.ipynb b/examples/VAE.ipynb index 6779465..1d48b72 100644 --- a/examples/VAE.ipynb +++ b/examples/VAE.ipynb @@ -16,7 +16,7 @@ "import numpy as np\n", "from abc import ABC, abstractmethod\n", "import stlcg \n", - "import stlviz \n", + "import stlcg.stlviz as stlviz \n", "\n", "import matplotlib.pyplot as plt\n", "from ipywidgets import interact, interactive, fixed, interact_manual\n", diff --git a/stlcg/__init__.py b/stlcg/__init__.py index e69de29..13b03d5 100644 --- a/stlcg/__init__.py +++ b/stlcg/__init__.py @@ -0,0 +1,41 @@ +from .stlcg import ( + Always, + And, + Equal, + Eventually, + Expression, + GreaterThan, + Identity, + Implies, + Integral1d, + LessThan, + Maxish, + Minish, + Negation, + Or, + STL_Formula, + Temporal_Operator, + Then, + Until, +) + +__all__ = [ + "Always", + "And", + "Equal", + "Eventually", + "Expression", + "GreaterThan", + "Identity", + "Implies", + "Integral1d", + "LessThan", + "Maxish", + "Minish", + "Negation", + "Or", + "STL_Formula", + "Temporal_Operator", + "Then", + "Until", +] diff --git a/stlcg/stlcg.py b/stlcg/stlcg.py index 4deb869..7c83911 100644 --- a/stlcg/stlcg.py +++ b/stlcg/stlcg.py @@ -3,7 +3,7 @@ import torch # import IPython -# from utils import tensor_to_str +# from stlcg.utils import tensor_to_str """ Important information: - This has the option to use an arithmetic-geometric mean robustness metric: https://arxiv.org/pdf/1903.05186.pdf. The default is not to use it. But this is still being tested. From 7cabe3be2f8f8b0c344371e5f5c01f628d6fcda9 Mon Sep 17 00:00:00 2001 From: Anand Balakrishnan Date: Mon, 19 Feb 2024 04:26:33 -0800 Subject: [PATCH 4/4] refactor: move stlcg package into src-layout --- pyproject.toml | 4 ++-- setup.py | 0 {stlcg => src/stlcg}/__init__.py | 0 {stlcg => src/stlcg}/logger.py | 0 {stlcg => src/stlcg}/stlcg.py | 0 {stlcg => src/stlcg}/stlviz.py | 0 {stlcg => src/stlcg}/utils.py | 0 7 files changed, 2 insertions(+), 2 deletions(-) delete mode 100644 setup.py rename {stlcg => src/stlcg}/__init__.py (100%) rename {stlcg => src/stlcg}/logger.py (100%) rename {stlcg => src/stlcg}/stlcg.py (100%) rename {stlcg => src/stlcg}/stlviz.py (100%) rename {stlcg => src/stlcg}/utils.py (100%) diff --git a/pyproject.toml b/pyproject.toml index a56680c..51a6dba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,8 +21,8 @@ classifiers = [ [project.urls] repository = "https://github.com/StanfordASL/stlcg" -[tool.setuptools] -packages = ["stlcg"] +[tool.setuptools.packages.find] +where = ["src"] [tool.isort] profile = "black" diff --git a/setup.py b/setup.py deleted file mode 100644 index e69de29..0000000 diff --git a/stlcg/__init__.py b/src/stlcg/__init__.py similarity index 100% rename from stlcg/__init__.py rename to src/stlcg/__init__.py diff --git a/stlcg/logger.py b/src/stlcg/logger.py similarity index 100% rename from stlcg/logger.py rename to src/stlcg/logger.py diff --git a/stlcg/stlcg.py b/src/stlcg/stlcg.py similarity index 100% rename from stlcg/stlcg.py rename to src/stlcg/stlcg.py diff --git a/stlcg/stlviz.py b/src/stlcg/stlviz.py similarity index 100% rename from stlcg/stlviz.py rename to src/stlcg/stlviz.py diff --git a/stlcg/utils.py b/src/stlcg/utils.py similarity index 100% rename from stlcg/utils.py rename to src/stlcg/utils.py