Skip to content

Commit

Permalink
Add a demo on quadrotor. (#57)
Browse files Browse the repository at this point in the history
Use polynomial dynamics for quadrotor. The numerics isn't very good.
  • Loading branch information
hongkai-dai authored Aug 13, 2024
1 parent 04e1775 commit e37a559
Show file tree
Hide file tree
Showing 12 changed files with 752 additions and 52 deletions.
39 changes: 38 additions & 1 deletion compatible_clf_cbf/clf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"""

from dataclasses import dataclass
import os
import pickle
from typing import List, Optional, Tuple, Union
from typing_extensions import Self

Expand All @@ -20,6 +22,7 @@
new_sos_polynomial,
solve_with_id,
)
import compatible_clf_cbf.utils


@dataclass
Expand Down Expand Up @@ -571,4 +574,38 @@ def find_candidate_regional_lyapunov(
prog.AddSosConstraint(derivative_sos_condition)
return prog, V

pass

def save_clf(V: sym.Polynomial, x_set: sym.Variables, kappa: float, pickle_path: str):
"""
Save the CLF to a pickle file.
"""
_, file_extension = os.path.splitext(pickle_path)
assert file_extension in (".pkl", ".pickle"), f"File extension is {file_extension}"
data = {}
data["V"] = compatible_clf_cbf.utils.serialize_polynomial(V, x_set)
data["kappa"] = kappa

if os.path.exists(pickle_path):
overwrite_cmd = input(
f"File {pickle_path} already exists. Overwrite the file? Press [Y/n]:"
)
if overwrite_cmd in ("Y", "y"):
save_cmd = True
else:
save_cmd = False
else:
save_cmd = True

if save_cmd:
with open(pickle_path, "wb") as handle:
pickle.dump(data, handle)


def load_clf(pickle_path: str, x_set: sym.Variables) -> dict:
ret = {}
with open(pickle_path, "rb") as handle:
data = pickle.load(handle)

ret["V"] = compatible_clf_cbf.utils.deserialize_polynomial(data["V"], x_set)
ret["kappa"] = data["kappa"]
return ret
60 changes: 41 additions & 19 deletions compatible_clf_cbf/clf_cbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,8 @@ def search_clf_cbf_given_lagrangian(
compatible_states_options: Optional[CompatibleStatesOptions] = None,
solver_id: Optional[solvers.SolverId] = None,
solver_options: Optional[solvers.SolverOptions] = None,
backoff_scale: Optional[float] = None,
backoff_rel_scale: Optional[float] = None,
backoff_abs_scale: Optional[float] = None,
) -> Tuple[
Optional[sym.Polynomial],
Optional[np.ndarray],
Expand Down Expand Up @@ -721,7 +722,9 @@ def search_clf_cbf_given_lagrangian(
elif compatible_states_options is not None:
self._add_compatible_states_options(prog, V, b, compatible_states_options)

result = solve_with_id(prog, solver_id, solver_options, backoff_scale)
result = solve_with_id(
prog, solver_id, solver_options, backoff_rel_scale, backoff_abs_scale
)
if result.is_success():
V_sol = None if V is None else result.GetSolution(V)
b_sol = np.array([result.GetSolution(b_i) for b_i in b])
Expand Down Expand Up @@ -874,7 +877,7 @@ def bilinear_alternation(
inner_ellipsoid_options: Optional[InnerEllipsoidOptions] = None,
binary_search_scale_options: Optional[BinarySearchOptions] = None,
compatible_states_options: Optional[CompatibleStatesOptions] = None,
backoff_scale: Optional[float] = None,
backoff_scales: Optional[List[compatible_clf_cbf.utils.BackoffScale]] = None,
) -> Tuple[Optional[sym.Polynomial], np.ndarray]:
"""
Synthesize the compatible CLF and CBF through bilinear alternation. We
Expand Down Expand Up @@ -915,8 +918,26 @@ def bilinear_alternation(
self.unsafe_regions
)

def evaluate_compatible_states(clf_fun, cbf_funs, x_val):
if clf_fun is not None:
V_candidates = clf_fun.EvaluateIndeterminates(self.x, x_val.T)
print(f"V(candidate_compatible_states)={V_candidates}")
b_candidates = [
b_i.EvaluateIndeterminates(
self.x,
x_val.T,
)
for b_i in cbf_funs
]
for i, b_candidates_val in enumerate(b_candidates):
print(f"b[{i}](candidate_compatible_states)={b_candidates_val}")

for iteration in range(max_iter):
print(f"iteration {iteration}")
if compatible_states_options is not None:
evaluate_compatible_states(
clf, cbf, compatible_states_options.candidate_compatible_states
)
# Search for the Lagrangians.
(
compatible_lagrangians,
Expand Down Expand Up @@ -996,23 +1017,22 @@ def bilinear_alternation(
compatible_states_options=compatible_states_options,
solver_id=solver_id,
solver_options=solver_options,
backoff_scale=backoff_scale,
backoff_rel_scale=(
None
if backoff_scales is None
else backoff_scales[iteration].rel
),
backoff_abs_scale=(
None
if backoff_scales is None
else backoff_scales[iteration].abs
),
)
assert cbf is not None
if clf is not None:
V_candidates = clf.EvaluateIndeterminates(
self.x, compatible_states_options.candidate_compatible_states.T
)
b_candidates = [
b_i.EvaluateIndeterminates(
self.x,
compatible_states_options.candidate_compatible_states.T,
)
for b_i in cbf
]
print(f"V(candidate_compatible_states)={V_candidates}")
for i, b_candidates_val in enumerate(b_candidates):
print(f"b[{i}](candidate_compatible_states)={b_candidates_val}")
if compatible_states_options is not None:
evaluate_compatible_states(
clf, cbf, compatible_states_options.candidate_compatible_states
)
return clf, cbf

def check_compatible_at_state(
Expand Down Expand Up @@ -1559,7 +1579,9 @@ def save_clf_cbf(
kappa_b: np.ndarray,
pickle_path: str,
):
""" """
"""
Save the CLF and CBF to a pickle file.
"""
_, file_extension = os.path.splitext(pickle_path)
assert file_extension in (".pkl", ".pickle"), f"File extension is {file_extension}"
data = {}
Expand Down
67 changes: 46 additions & 21 deletions compatible_clf_cbf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,51 +226,76 @@ def to_lower_triangular_columns(mat: np.ndarray) -> np.ndarray:
return ret


@dataclasses.dataclass
class BackoffScale:
rel: Optional[float]
abs: Optional[float]


def solve_with_id(
prog: solvers.MathematicalProgram,
solver_id: Optional[solvers.SolverId] = None,
solver_options: Optional[solvers.SolverOptions] = None,
backoff_scale: Optional[float] = None,
backoff_rel_scale: Optional[float] = None,
backoff_abs_scale: Optional[float] = None,
) -> solvers.MathematicalProgramResult:
"""
Args:
backoff_scale: when solving an optimization problem with an objective function,
we first solve the problem to optimality, and then "back off" a little bit to find
a sub-optimal but strictly feasible solution. backoff_scale=0 corresponds to no
backoff. Note that during backing off, we will modify the original `prog`.
backoff_rel_scale: when solving an optimization problem with an objective
function, we first solve the problem to optimality, and then "back off" a
little bit to find a sub-optimal but strictly feasible solution.
backoff_rel_scale=0 corresponds to no backoff. Note that during backing
off, we will modify the original `prog`.
backoff_abs_scale: The absolute scale to back off.
"""
if solver_id is None:
result = solvers.Solve(prog, None, solver_options)
else:
solver = solvers.MakeSolver(solver_id)
result = solver.Solve(prog, None, solver_options)
if (
len(prog.linear_costs()) > 0 or len(prog.quadratic_costs()) > 0
) and backoff_scale is not None:
if (len(prog.linear_costs()) > 0 or len(prog.quadratic_costs()) > 0) and (
backoff_rel_scale is not None or backoff_abs_scale is not None
):
assert (
len(prog.linear_costs()) == 1
), "TODO(hongkai.dai): support program with multiple LinearCost objects."
assert (
len(prog.quadratic_costs()) == 0
), "TODO(hongkai.dai): we currently only support program with linear costs."
assert backoff_scale >= 0, "backoff_scale should be non-negative."
if backoff_rel_scale is not None:
assert backoff_rel_scale >= 0, "backoff_rel_scale should be non-negative."
if backoff_abs_scale is not None:
assert backoff_abs_scale >= 0, "backoff_abs_scale should be non-negative."
# Cannot handle both backoff_rel_scale and backoff_abs_scale
assert (
backoff_rel_scale is None or backoff_abs_scale is None
), "backoff_rel_scale and backoff_abs_scale cannot both be set."

optimal_cost = result.get_optimal_cost()
coeff_cost = prog.linear_costs()[0].evaluator().a()
var_cost = prog.linear_costs()[0].variables()
constant_cost = prog.linear_costs()[0].evaluator().b()
prog.RemoveCost(prog.linear_costs()[0])
cost_upper_bound = (
optimal_cost * (1 + backoff_scale)
if optimal_cost > 0
else optimal_cost * (1 - backoff_scale)
)
prog.AddLinearConstraint(
coeff_cost, -np.inf, cost_upper_bound - constant_cost, var_cost
)
if solver_id is None:
result = solvers.Solve(prog, None, solver_options)
if backoff_rel_scale is not None:
cost_upper_bound = (
optimal_cost * (1 + backoff_rel_scale)
if optimal_cost > 0
else optimal_cost * (1 - backoff_rel_scale)
)
elif backoff_abs_scale is not None:
cost_upper_bound = optimal_cost + backoff_abs_scale
else:
result = solver.Solve(prog, None, solver_options)
assert Exception("backoff_rel_scale or backoff_abs_scale should be set.")
if (backoff_rel_scale is not None and backoff_rel_scale > 0) or (
backoff_abs_scale is not None and backoff_abs_scale
) > 0:
prog.RemoveCost(prog.linear_costs()[0])
prog.AddLinearConstraint(
coeff_cost, -np.inf, cost_upper_bound - constant_cost, var_cost
)
if solver_id is None:
result = solvers.Solve(prog, None, solver_options)
else:
result = solver.Solve(prog, None, solver_options)
return result


Expand Down
6 changes: 5 additions & 1 deletion examples/nonlinear_toy/synthesize_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pydrake.symbolic as sym

from compatible_clf_cbf import clf_cbf
import compatible_clf_cbf.utils
from examples.nonlinear_toy import toy_system


Expand Down Expand Up @@ -84,7 +85,10 @@ def main(with_u_bound: bool):
binary_search_scale_options=None,
compatible_states_options=compatible_states_options,
solver_options=solver_options,
backoff_scale=0.02,
backoff_scales=[
compatible_clf_cbf.utils.BackoffScale(rel=0.02, abs=None)
for _ in range(max_iter)
],
)


Expand Down
Loading

0 comments on commit e37a559

Please sign in to comment.