Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simulate quadrotor with CLF/CBF controller. #64

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions compatible_clf_cbf/cbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,9 @@ def __init__(

def add_to_prog(
self, prog: solvers.MathematicalProgram, x_val: np.ndarray, u: np.ndarray
):
) -> solvers.Binding[solvers.LinearConstraint]:
env = {self.x[i]: x_val[i] for i in range(x_val.size)}
lhs_coeff = np.array([p.Evaluate(env) for p in self.lhs_coeff])
rhs = self.rhs.Evaluate(env)
prog.AddLinearConstraint(lhs_coeff, rhs, np.inf, u)
constraint = prog.AddLinearConstraint(lhs_coeff, rhs, np.inf, u)
return constraint
5 changes: 3 additions & 2 deletions compatible_clf_cbf/clf.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,9 @@ def __init__(

def add_to_prog(
self, prog: solvers.MathematicalProgram, x_val: np.ndarray, u: np.ndarray
):
) -> solvers.Binding[solvers.LinearConstraint]:
env = {self.x[i]: x_val[i] for i in range(x_val.size)}
lhs_coeff = np.array([p.Evaluate(env) for p in self.lhs_coeff])
rhs = self.rhs.Evaluate(env)
prog.AddLinearConstraint(lhs_coeff, -np.inf, rhs, u)
constraint = prog.AddLinearConstraint(lhs_coeff, -np.inf, rhs, u)
return constraint
96 changes: 96 additions & 0 deletions compatible_clf_cbf/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import Optional

import numpy as np

import pydrake.solvers as solvers
import pydrake.symbolic as sym
import pydrake.systems.framework

import compatible_clf_cbf.clf
import compatible_clf_cbf.cbf

import compatible_clf_cbf.utils


class ClfCbfController(pydrake.systems.framework.LeafSystem):
def __init__(
self,
f: np.ndarray,
g: np.ndarray,
V: sym.Polynomial,
b: np.ndarray,
x: np.ndarray,
kappa_V: float,
kappa_b: np.ndarray,
Qu: np.ndarray,
Au: Optional[np.ndarray],
bu: Optional[np.ndarray],
solver_id: Optional[solvers.SolverId],
solver_options: Optional[solvers.SolverOptions],
):
super().__init__()
self.nu = g.shape[1]
self.DeclareVectorInputPort("state", x.size)
self.action_output_index = self.DeclareVectorOutputPort(
"action", self.nu, self.calc_action
).get_index()
self.V = V
self.b = b
self.x = x
self.V_output_index = self.DeclareVectorOutputPort(
"V", 1, self.calc_V
).get_index()
self.b_output_index = self.DeclareVectorOutputPort(
"b", b.size, self.calc_b
).get_index()
self.clf_constraint = compatible_clf_cbf.clf.ClfConstraint(V, f, g, x, kappa_V)
self.cbf_constraint = [
compatible_clf_cbf.cbf.CbfConstraint(b[i], f, g, x, kappa_b[i])
for i in range(b.size)
]
self.Qu = Qu
self.Au = Au
self.bu = bu
self.solver_id = solver_id
self.solver_options = solver_options

def action_output_port(self):
return self.get_output_port(self.action_output_index)

def clf_output_port(self):
return self.get_output_port(self.V_output_index)

def cbf_output_port(self):
return self.get_output_port(self.b_output_index)

def calc_action(self, context: pydrake.systems.framework.Context, output):
x_val: np.ndarray = self.get_input_port(0).Eval(context)
prog = solvers.MathematicalProgram()
u = prog.NewContinuousVariables(self.nu, "u")
prog.AddQuadraticCost(self.Qu, np.zeros((self.nu,)), u, is_convex=True)
self.clf_constraint.add_to_prog(prog, x_val, u)
for cbf_cnstr in self.cbf_constraint:
cbf_cnstr.add_to_prog(prog, x_val, u)
if self.Au is not None:
assert self.bu is not None
prog.AddLinearConstraint(
self.Au, np.full_like(self.bu, -np.inf), self.bu, u
)
result = compatible_clf_cbf.utils.solve_with_id(
prog, self.solver_id, self.solver_options
)
assert result.is_success()
u_val = result.GetSolution(u)
output.set_value(u_val)

def calc_V(self, context: pydrake.systems.framework.Context, output):
x_val: np.ndarray = self.get_input_port(0).Eval(context)
env = {self.x[i]: x_val[i] for i in range(self.x.size)}
V_val = self.V.Evaluate(env)
output.set_value(np.array([V_val]))

def calc_b(self, context: pydrake.systems.framework.Context, output):
x_val: np.ndarray = self.get_input_port(0).Eval(context)
env = {self.x[i]: x_val[i] for i in range(self.x.size)}
b_val = np.array([b_i.Evaluate(env) for b_i in self.b])
output.set_value(b_val)
115 changes: 77 additions & 38 deletions examples/nonlinear_toy/demo_trigpoly.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@
import pydrake.solvers as solvers

import compatible_clf_cbf.clf_cbf as clf_cbf
import compatible_clf_cbf.utils as utils
import compatible_clf_cbf.utils as utils # noqa
import examples.nonlinear_toy.toy_system as toy_system
import examples.nonlinear_toy.incompatibility


def get_grid_pts():
grid_theta, grid_x2 = np.meshgrid(
np.linspace(-np.pi, np.pi, 100), np.linspace(-3, 4, 100)
)
grid_x_vals = np.concatenate(
(
np.sin(grid_theta.reshape((1, -1))),
np.cos(grid_theta.reshape((1, -1))) - 1,
grid_x2.reshape((1, -1)),
),
axis=0,
)
return grid_theta, grid_x2, grid_x_vals


def plot_clf_cbf(
Expand All @@ -37,37 +53,23 @@ def plot_clf_cbf(
Args:
fill_compatible: Fill in the compatible region.
"""
grid_theta, grid_theta_dot = np.meshgrid(
np.linspace(-np.pi, np.pi, 100), np.linspace(-3, 3, 100)
)
grid_x_vals = np.concatenate(
(
np.sin(grid_theta.reshape((1, -1))),
np.cos(grid_theta.reshape((1, -1))) - 1,
grid_theta_dot.reshape((1, -1)),
),
axis=0,
)
grid_theta, grid_x2, grid_x_vals = get_grid_pts()
grid_V = V.EvaluateIndeterminates(x, grid_x_vals).reshape(grid_theta.shape)
grid_b = b[0].EvaluateIndeterminates(x, grid_x_vals).reshape(grid_theta.shape)
h_V = ax.contour(
grid_theta, grid_theta_dot, grid_V, levels=np.array([1]), colors="red"
)
h_b = ax.contour(
grid_theta, grid_theta_dot, grid_b, levels=np.array([0]), colors="blue"
)
h_V = ax.contour(grid_theta, grid_x2, grid_V, levels=np.array([1]), colors="red")
h_b = ax.contour(grid_theta, grid_x2, grid_b, levels=np.array([0]), colors="blue")

if fill_compatible:
# Fill in the region {x|V(x)<=1, b(x) >= 0}, namely
# {x | max(V(x)-1, -b(x)) <= 0}.
grid_fill_vals = np.maximum(grid_V - 1, -grid_b)
h_compatible = ax.contourf(
grid_theta,
grid_theta_dot,
grid_x2,
grid_fill_vals,
levels=[-np.inf, 0],
colors="green",
alpha=0.2,
alpha=0.4,
)
else:
h_compatible = None
Expand All @@ -82,17 +84,7 @@ def get_unsafe_regions(x: np.ndarray) -> np.ndarray:
def plot_unsafe_regions(ax: matplotlib.axes.Axes):
x = sym.MakeVectorContinuousVariable(3, "x")
unsafe_regions = get_unsafe_regions(x)
grid_theta, grid_theta_dot = np.meshgrid(
np.linspace(-np.pi, np.pi, 100), np.linspace(-3, 3, 100)
)
grid_x_vals = np.concatenate(
(
np.sin(grid_theta.reshape((1, -1))),
np.cos(grid_theta.reshape((1, -1))) - 1,
grid_theta_dot.reshape((1, -1)),
),
axis=0,
)
grid_theta, grid_x2, grid_x_vals = get_grid_pts()
unsafe_values = (
np.concatenate(
[
Expand All @@ -106,7 +98,7 @@ def plot_unsafe_regions(ax: matplotlib.axes.Axes):
)
h_unsafe = ax.contourf(
grid_theta,
grid_theta_dot,
grid_x2,
unsafe_values,
levels=np.array([-np.inf, 0.0]),
alpha=0.5,
Expand Down Expand Up @@ -167,15 +159,17 @@ def search(unit_test_flag: bool = False):
within=None,
)
]
kappa_V = 0.01
kappa_b = np.array([0.01])
kappa_V = 0.1
kappa_b = np.array([0.1])
barrier_eps = np.array([0.001])

x_equilibrium = np.array([0, 0.0, 0.0])

clf_degree = 2
cbf_degrees = [2]
max_iter = 20
# Somehow the code runs into error after 13 iterations on the CI, but is
# fine on my laptop for 20 iterations.
max_iter = 10 if unit_test_flag else 20

binary_search_scale_options = None
inner_ellipsoid_options = None
Expand All @@ -185,6 +179,9 @@ def search(unit_test_flag: bool = False):
[np.sin(-np.pi / 3), np.cos(-np.pi / 3) - 1, -0.5],
[np.sin(0), np.cos(0) - 1, -1.5],
[np.sin(np.pi / 2), np.cos(np.pi / 2) - 1, -1.9],
[np.sin(0), np.cos(0) - 1, 2],
[np.sin(0.75 * np.pi), np.cos(0.75 * np.pi) - 1, 1],
[np.sin(-0.8 * np.pi), np.cos(-0.8 * np.pi) - 1, 1],
]
),
anchor_states=np.array([[0.0, 0, 0]]),
Expand Down Expand Up @@ -212,7 +209,7 @@ def search(unit_test_flag: bool = False):
binary_search_scale_options=binary_search_scale_options,
compatible_states_options=compatible_states_options,
solver_options=solver_options,
backoff_scale=utils.BackoffScale(rel=None, abs=0.01),
# backoff_scale=utils.BackoffScale(rel=None, abs=0.005),
)
print(f"V={V}")
print(f"b={b}")
Expand All @@ -233,6 +230,40 @@ def search(unit_test_flag: bool = False):
return V, b


def plot_incompatible(
ax: matplotlib.axes.Axes,
V: sym.Polynomial,
b: sym.Polynomial,
x: np.ndarray,
kappa_V: float,
kappa_b: float,
):
grid_theta, grid_x2, grid_x_vals = get_grid_pts()

compute_incompatible = (
examples.nonlinear_toy.incompatibility.ComputeIncompatibility(
V, b, x, kappa_V, kappa_b, u_min=-1, u_max=1
)
)

grid_incompatible_vals = np.array(
[
compute_incompatible.eval(grid_x_vals[:, i])
for i in range(grid_x_vals.shape[1])
]
).reshape(grid_theta.shape)

h_incompatible = ax.contourf(
grid_theta,
grid_x2,
grid_incompatible_vals,
levels=[0, np.inf],
colors="cyan",
alpha=0.4,
)
return h_incompatible


def visualize():
x = sym.MakeVectorContinuousVariable(3, "x")
f, g = toy_system.affine_trig_poly_dynamics(x)
Expand All @@ -245,7 +276,7 @@ def visualize():
fig = plt.figure()
ax = fig.add_subplot()
ax.set_xlabel(r"$\theta$ (rad)", fontsize=16)
ax.set_ylabel(r"$\dot{\theta}$ (rad/s)", fontsize=16)
ax.set_ylabel(r"$\gamma$", fontsize=16)
ax.set_xticks([-np.pi, -np.pi / 2, 0, np.pi / 2, np.pi])
ax.set_xticklabels(
[r"$-\pi$", r"$-\frac{\pi}{2}$", r"0", r"$\frac{\pi}{2}$", r"$\pi$"]
Expand All @@ -256,6 +287,14 @@ def visualize():
)
h_V_init, h_b_init = plot_clf_cbf_init(ax)
plot_unsafe_regions(ax)
h_incompatible = plot_incompatible( # noqa
ax,
saved_data["V"],
saved_data["b"][0],
x,
saved_data["kappa_V"],
saved_data["kappa_b"][0],
)
ax.legend(
[
h_V.legend_elements()[0][0],
Expand All @@ -267,7 +306,7 @@ def visualize():
prop={"size": 12},
)
fig.show()
for fig_extension in (".png", "pdf"):
for fig_extension in (".png", ".pdf"):
fig.savefig(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
Expand Down
Loading