Skip to content

Commit

Permalink
Simulate quadrotor with CLF/CBF controller.
Browse files Browse the repository at this point in the history
Also draw the incompatible region for the nonlinear toy example.
  • Loading branch information
hongkai-dai committed Aug 26, 2024
1 parent 13615e7 commit a7708f8
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 63 deletions.
5 changes: 3 additions & 2 deletions compatible_clf_cbf/cbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,9 @@ def __init__(

def add_to_prog(
self, prog: solvers.MathematicalProgram, x_val: np.ndarray, u: np.ndarray
):
) -> solvers.Binding[solvers.LinearConstraint]:
env = {self.x[i]: x_val[i] for i in range(x_val.size)}
lhs_coeff = np.array([p.Evaluate(env) for p in self.lhs_coeff])
rhs = self.rhs.Evaluate(env)
prog.AddLinearConstraint(lhs_coeff, rhs, np.inf, u)
constraint = prog.AddLinearConstraint(lhs_coeff, rhs, np.inf, u)
return constraint
5 changes: 3 additions & 2 deletions compatible_clf_cbf/clf.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,9 @@ def __init__(

def add_to_prog(
self, prog: solvers.MathematicalProgram, x_val: np.ndarray, u: np.ndarray
):
) -> solvers.Binding[solvers.LinearConstraint]:
env = {self.x[i]: x_val[i] for i in range(x_val.size)}
lhs_coeff = np.array([p.Evaluate(env) for p in self.lhs_coeff])
rhs = self.rhs.Evaluate(env)
prog.AddLinearConstraint(lhs_coeff, -np.inf, rhs, u)
constraint = prog.AddLinearConstraint(lhs_coeff, -np.inf, rhs, u)
return constraint
96 changes: 96 additions & 0 deletions compatible_clf_cbf/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import Optional

import numpy as np

import pydrake.solvers as solvers
import pydrake.symbolic as sym
import pydrake.systems.framework

import compatible_clf_cbf.clf
import compatible_clf_cbf.cbf

import compatible_clf_cbf.utils


class ClfCbfController(pydrake.systems.framework.LeafSystem):
def __init__(
self,
f: np.ndarray,
g: np.ndarray,
V: sym.Polynomial,
b: np.ndarray,
x: np.ndarray,
kappa_V: float,
kappa_b: np.ndarray,
Qu: np.ndarray,
Au: Optional[np.ndarray],
bu: Optional[np.ndarray],
solver_id: Optional[solvers.SolverId],
solver_options: Optional[solvers.SolverOptions],
):
super().__init__()
self.nu = g.shape[1]
self.DeclareVectorInputPort("state", x.size)
self.action_output_index = self.DeclareVectorOutputPort(
"action", self.nu, self.calc_action
).get_index()
self.V = V
self.b = b
self.x = x
self.V_output_index = self.DeclareVectorOutputPort(
"V", 1, self.calc_V
).get_index()
self.b_output_index = self.DeclareVectorOutputPort(
"b", b.size, self.calc_b
).get_index()
self.clf_constraint = compatible_clf_cbf.clf.ClfConstraint(V, f, g, x, kappa_V)
self.cbf_constraint = [
compatible_clf_cbf.cbf.CbfConstraint(b[i], f, g, x, kappa_b[i])
for i in range(b.size)
]
self.Qu = Qu
self.Au = Au
self.bu = bu
self.solver_id = solver_id
self.solver_options = solver_options

def action_output_port(self):
return self.get_output_port(self.action_output_index)

def clf_output_port(self):
return self.get_output_port(self.V_output_index)

def cbf_output_port(self):
return self.get_output_port(self.b_output_index)

def calc_action(self, context: pydrake.systems.framework.Context, output):
x_val: np.ndarray = self.get_input_port(0).Eval(context)
prog = solvers.MathematicalProgram()
u = prog.NewContinuousVariables(self.nu, "u")
prog.AddQuadraticCost(self.Qu, np.zeros((self.nu,)), u, is_convex=True)
self.clf_constraint.add_to_prog(prog, x_val, u)
for cbf_cnstr in self.cbf_constraint:
cbf_cnstr.add_to_prog(prog, x_val, u)
if self.Au is not None:
assert self.bu is not None
prog.AddLinearConstraint(
self.Au, np.full_like(self.bu, -np.inf), self.bu, u
)
result = compatible_clf_cbf.utils.solve_with_id(
prog, self.solver_id, self.solver_options
)
assert result.is_success()
u_val = result.GetSolution(u)
output.set_value(u_val)

def calc_V(self, context: pydrake.systems.framework.Context, output):
x_val: np.ndarray = self.get_input_port(0).Eval(context)
env = {self.x[i]: x_val[i] for i in range(self.x.size)}
V_val = self.V.Evaluate(env)
output.set_value(np.array([V_val]))

def calc_b(self, context: pydrake.systems.framework.Context, output):
x_val: np.ndarray = self.get_input_port(0).Eval(context)
env = {self.x[i]: x_val[i] for i in range(self.x.size)}
b_val = np.array([b_i.Evaluate(env) for b_i in self.b])
output.set_value(b_val)
115 changes: 77 additions & 38 deletions examples/nonlinear_toy/demo_trigpoly.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@
import pydrake.solvers as solvers

import compatible_clf_cbf.clf_cbf as clf_cbf
import compatible_clf_cbf.utils as utils
import compatible_clf_cbf.utils as utils # noqa
import examples.nonlinear_toy.toy_system as toy_system
import examples.nonlinear_toy.incompatibility


def get_grid_pts():
grid_theta, grid_x2 = np.meshgrid(
np.linspace(-np.pi, np.pi, 100), np.linspace(-3, 4, 100)
)
grid_x_vals = np.concatenate(
(
np.sin(grid_theta.reshape((1, -1))),
np.cos(grid_theta.reshape((1, -1))) - 1,
grid_x2.reshape((1, -1)),
),
axis=0,
)
return grid_theta, grid_x2, grid_x_vals


def plot_clf_cbf(
Expand All @@ -37,37 +53,23 @@ def plot_clf_cbf(
Args:
fill_compatible: Fill in the compatible region.
"""
grid_theta, grid_theta_dot = np.meshgrid(
np.linspace(-np.pi, np.pi, 100), np.linspace(-3, 3, 100)
)
grid_x_vals = np.concatenate(
(
np.sin(grid_theta.reshape((1, -1))),
np.cos(grid_theta.reshape((1, -1))) - 1,
grid_theta_dot.reshape((1, -1)),
),
axis=0,
)
grid_theta, grid_x2, grid_x_vals = get_grid_pts()
grid_V = V.EvaluateIndeterminates(x, grid_x_vals).reshape(grid_theta.shape)
grid_b = b[0].EvaluateIndeterminates(x, grid_x_vals).reshape(grid_theta.shape)
h_V = ax.contour(
grid_theta, grid_theta_dot, grid_V, levels=np.array([1]), colors="red"
)
h_b = ax.contour(
grid_theta, grid_theta_dot, grid_b, levels=np.array([0]), colors="blue"
)
h_V = ax.contour(grid_theta, grid_x2, grid_V, levels=np.array([1]), colors="red")
h_b = ax.contour(grid_theta, grid_x2, grid_b, levels=np.array([0]), colors="blue")

if fill_compatible:
# Fill in the region {x|V(x)<=1, b(x) >= 0}, namely
# {x | max(V(x)-1, -b(x)) <= 0}.
grid_fill_vals = np.maximum(grid_V - 1, -grid_b)
h_compatible = ax.contourf(
grid_theta,
grid_theta_dot,
grid_x2,
grid_fill_vals,
levels=[-np.inf, 0],
colors="green",
alpha=0.2,
alpha=0.4,
)
else:
h_compatible = None
Expand All @@ -82,17 +84,7 @@ def get_unsafe_regions(x: np.ndarray) -> np.ndarray:
def plot_unsafe_regions(ax: matplotlib.axes.Axes):
x = sym.MakeVectorContinuousVariable(3, "x")
unsafe_regions = get_unsafe_regions(x)
grid_theta, grid_theta_dot = np.meshgrid(
np.linspace(-np.pi, np.pi, 100), np.linspace(-3, 3, 100)
)
grid_x_vals = np.concatenate(
(
np.sin(grid_theta.reshape((1, -1))),
np.cos(grid_theta.reshape((1, -1))) - 1,
grid_theta_dot.reshape((1, -1)),
),
axis=0,
)
grid_theta, grid_x2, grid_x_vals = get_grid_pts()
unsafe_values = (
np.concatenate(
[
Expand All @@ -106,7 +98,7 @@ def plot_unsafe_regions(ax: matplotlib.axes.Axes):
)
h_unsafe = ax.contourf(
grid_theta,
grid_theta_dot,
grid_x2,
unsafe_values,
levels=np.array([-np.inf, 0.0]),
alpha=0.5,
Expand Down Expand Up @@ -167,15 +159,17 @@ def search(unit_test_flag: bool = False):
within=None,
)
]
kappa_V = 0.01
kappa_b = np.array([0.01])
kappa_V = 0.1
kappa_b = np.array([0.1])
barrier_eps = np.array([0.001])

x_equilibrium = np.array([0, 0.0, 0.0])

clf_degree = 2
cbf_degrees = [2]
max_iter = 20
# Somehow the code runs into error after 13 iterations on the CI, but is
# fine on my laptop for 20 iterations.
max_iter = 10 if unit_test_flag else 20

binary_search_scale_options = None
inner_ellipsoid_options = None
Expand All @@ -185,6 +179,9 @@ def search(unit_test_flag: bool = False):
[np.sin(-np.pi / 3), np.cos(-np.pi / 3) - 1, -0.5],
[np.sin(0), np.cos(0) - 1, -1.5],
[np.sin(np.pi / 2), np.cos(np.pi / 2) - 1, -1.9],
[np.sin(0), np.cos(0) - 1, 2],
[np.sin(0.75 * np.pi), np.cos(0.75 * np.pi) - 1, 1],
[np.sin(-0.8 * np.pi), np.cos(-0.8 * np.pi) - 1, 1],
]
),
anchor_states=np.array([[0.0, 0, 0]]),
Expand Down Expand Up @@ -212,7 +209,7 @@ def search(unit_test_flag: bool = False):
binary_search_scale_options=binary_search_scale_options,
compatible_states_options=compatible_states_options,
solver_options=solver_options,
backoff_scale=utils.BackoffScale(rel=None, abs=0.01),
# backoff_scale=utils.BackoffScale(rel=None, abs=0.005),
)
print(f"V={V}")
print(f"b={b}")
Expand All @@ -233,6 +230,40 @@ def search(unit_test_flag: bool = False):
return V, b


def plot_incompatible(
ax: matplotlib.axes.Axes,
V: sym.Polynomial,
b: sym.Polynomial,
x: np.ndarray,
kappa_V: float,
kappa_b: float,
):
grid_theta, grid_x2, grid_x_vals = get_grid_pts()

compute_incompatible = (
examples.nonlinear_toy.incompatibility.ComputeIncompatibility(
V, b, x, kappa_V, kappa_b, u_min=-1, u_max=1
)
)

grid_incompatible_vals = np.array(
[
compute_incompatible.eval(grid_x_vals[:, i])
for i in range(grid_x_vals.shape[1])
]
).reshape(grid_theta.shape)

h_incompatible = ax.contourf(
grid_theta,
grid_x2,
grid_incompatible_vals,
levels=[0, np.inf],
colors="cyan",
alpha=0.4,
)
return h_incompatible


def visualize():
x = sym.MakeVectorContinuousVariable(3, "x")
f, g = toy_system.affine_trig_poly_dynamics(x)
Expand All @@ -245,7 +276,7 @@ def visualize():
fig = plt.figure()
ax = fig.add_subplot()
ax.set_xlabel(r"$\theta$ (rad)", fontsize=16)
ax.set_ylabel(r"$\dot{\theta}$ (rad/s)", fontsize=16)
ax.set_ylabel(r"$\gamma$", fontsize=16)
ax.set_xticks([-np.pi, -np.pi / 2, 0, np.pi / 2, np.pi])
ax.set_xticklabels(
[r"$-\pi$", r"$-\frac{\pi}{2}$", r"0", r"$\frac{\pi}{2}$", r"$\pi$"]
Expand All @@ -256,6 +287,14 @@ def visualize():
)
h_V_init, h_b_init = plot_clf_cbf_init(ax)
plot_unsafe_regions(ax)
h_incompatible = plot_incompatible( # noqa
ax,
saved_data["V"],
saved_data["b"][0],
x,
saved_data["kappa_V"],
saved_data["kappa_b"][0],
)
ax.legend(
[
h_V.legend_elements()[0][0],
Expand All @@ -267,7 +306,7 @@ def visualize():
prop={"size": 12},
)
fig.show()
for fig_extension in (".png", "pdf"):
for fig_extension in (".png", ".pdf"):
fig.savefig(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
Expand Down
Loading

0 comments on commit a7708f8

Please sign in to comment.