diff --git a/src/ezdxf/edgeminer.py b/src/ezdxf/edgeminer.py index 05868cdf2..f9a44f59c 100644 --- a/src/ezdxf/edgeminer.py +++ b/src/ezdxf/edgeminer.py @@ -13,14 +13,15 @@ """ from __future__ import annotations -from typing import Any, Sequence, Iterator, Iterable -from typing_extensions import Self +from typing import Any, Sequence, Iterator, Iterable, Dict, Tuple +from typing_extensions import Self, TypeAlias from collections import defaultdict import time from ezdxf.math import UVec, Vec3, distance_point_line_3d from ezdxf.math import rtree + __all__ = [ "find_all_loops", "find_first_loop", @@ -58,6 +59,8 @@ class Edge: Therefore, the length of the edge must be specified if the length calculation for a sequence of edges is to be possible. + This class is immutable by design! + Attributes: id: unique id as int start: start vertex as Vec3 @@ -89,6 +92,17 @@ def __eq__(self, other) -> bool: return self.id == other.id return False + def __repr__(self) -> str: + if self.payload is None: + content = str(self.id) + else: + content = str(self.payload) + return f"Edge({content})" + + def __hash__(self) -> int: + # edges can be used in sets and set-operations + return self.id + def reversed(self) -> Self: """Returns a reversed copy.""" edge = self.__class__(self.end, self.start, self.length, self.payload) @@ -97,6 +111,13 @@ def reversed(self) -> Self: return edge +def isclose(a: Vec3, b: Vec3, gap_tol: float = GAP_TOL) -> bool: + """This function should be used to test whether two vertices are close to each other + to get consistent results. + """ + return a.distance(b) < gap_tol + + class Watchdog: def __init__(self, timeout=TIMEOUT) -> None: self.timeout: float = timeout @@ -166,6 +187,37 @@ def find_first_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]: return [] +def find_first_loop_net(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]: + """Returns the first closed loop found. + + This function is based on networks of connected edges. + + Args: + edges: edges to be examined + gap_tol: maximum vertex distance to consider two edges as connected + """ + + edges = type_check(edges) + if len(edges) < 2: + return [] + deposit = EdgeDeposit(edges, gap_tol=gap_tol) + networks: list[Network] = [] + available = set(edges) + while available: + network = deposit.build_network(available.pop()) + if len(network): + networks.append(network) + available -= set(network) + + networks.sort(key=lambda n: len(n)) + for network in networks: + finder = LoopFinderNet(network) + loop = finder.find_any_loop() + if loop: + return loop + return [] + + def find_all_loops(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Sequence[Edge]]: """Returns all unique closed loops and doesn't include reversed solutions. @@ -201,7 +253,7 @@ def filter_short_edges(edges: Iterable[Edge], gap_tol=GAP_TOL) -> tuple[Edge, .. These edges represent very short curves or maybe closed curves like circles and ellipses. """ - return tuple(e for e in edges if e.start.distance(e.end) >= gap_tol) + return tuple(e for e in edges if not isclose(e.start, e.end, gap_tol)) class Loop: @@ -216,6 +268,13 @@ class Loop: def __init__(self, edges: tuple[Edge, ...]) -> None: self.edges: tuple[Edge, ...] = edges + def __repr__(self) -> str: + content = ",".join(str(e) for e in self.edges) + return f"Loop([{content}])" + + def __len__(self) -> int: + return len(self.edges) + def is_connected(self, edge: Edge, gap_tol=GAP_TOL) -> bool: """Returns ``True`` if the last edge of the loop is connected to the given edge. @@ -224,7 +283,7 @@ def is_connected(self, edge: Edge, gap_tol=GAP_TOL) -> bool: gap_tol: maximum vertex distance to consider two edges as connected """ if self.edges: - return self.edges[-1].end.distance(edge.start) < gap_tol + return isclose(self.edges[-1].end, edge.start, gap_tol) return False def is_closed_loop(self, gap_tol=GAP_TOL) -> bool: @@ -235,7 +294,7 @@ def is_closed_loop(self, gap_tol=GAP_TOL) -> bool: """ if len(self.edges) > 1: - return self.edges[0].start.distance(self.edges[-1].end) < gap_tol + return isclose(self.edges[0].start, self.edges[-1].end, gap_tol) return False def key(self, reverse=False) -> tuple[int, ...]: @@ -255,6 +314,9 @@ def key(self, reverse=False) -> tuple[int, ...]: return ids +SearchSolutions: TypeAlias = Dict[Tuple[int, ...], Tuple[Edge, ...]] + + class LoopFinderRBT: """Recursive backtracking algorithm to find closed loops with time complexity of O(n!). @@ -265,7 +327,7 @@ class LoopFinderRBT: """ def __init__(self, first=False, discard_reverse=True, gap_tol=GAP_TOL) -> None: - self._solutions: dict[tuple[int, ...], tuple[Edge, ...]] = {} + self._solutions: SearchSolutions = {} self._stop_at_first_solution = first self._discard_reverse_solutions = discard_reverse self._gap_tol = gap_tol @@ -310,7 +372,7 @@ def search(self, start: Edge, available: Sequence[Edge], timeout=TIMEOUT): def _search(self, loop: Loop, available: tuple[Edge, ...]): for next_edge in available: if self.watchdog.has_timed_out: - raise TimeoutError("search process has timed out") + raise TimeoutError("search process has timed out") edge = next_edge extended_loop: Loop | None = None if loop.is_connected(edge, self._gap_tol): @@ -324,24 +386,15 @@ def _search(self, loop: Loop, available: tuple[Edge, ...]): continue if extended_loop.is_closed_loop(self._gap_tol): - self._append_solution(extended_loop) + add_search_solution( + self._solutions, extended_loop, self._discard_reverse_solutions + ) if self._stop_at_first_solution: raise StopSearchException else: # depth search, may raise RecursionError _id = edge.id self._search(extended_loop, tuple(e for e in available if e.id != _id)) - def _append_solution(self, loop: Loop) -> None: - key = loop.key() - if key in self._solutions: - return - if ( - self._discard_reverse_solutions - and loop.key(reverse=True) in self._solutions - ): - return - self._solutions[key] = loop.edges - class EdgeVertexIndex: """Index of edges referenced by the id of their start- and end vertices. @@ -370,7 +423,7 @@ def find_edges(self, vertices: Iterable[Vec3]) -> Sequence[Edge]: return edges -class SearchIndex: +class SpatialSearchIndex: """Spatial search index of all edge vertices.""" def __init__(self, edges: Sequence[Edge]) -> None: @@ -399,10 +452,12 @@ def discard_ids(edges: Iterable[Edge], ids: set[int]) -> Sequence[Edge]: class EdgeDeposit: + """The edge deposit stores all available edges for further searches.""" + def __init__(self, edges: Sequence[Edge], gap_tol=GAP_TOL) -> None: self.gap_tol = gap_tol self.edge_index = EdgeVertexIndex(edges) - self.search_index = SearchIndex(edges) + self.search_index = SpatialSearchIndex(edges) def edges_linked_to(self, vertex: UVec, radius: float = -1) -> Sequence[Edge]: """Returns all edges linked to `vertex` in range of `radius`. @@ -444,14 +499,13 @@ def build_network(self, edge: Edge, timeout=TIMEOUT) -> Network: def process(vertex: Vec3) -> None: linked_edges = self.edges_linked_to(vertex) - linked_edges = discard_ids(linked_edges, ids=found) + linked_edges = discard_ids(linked_edges, ids=done) if linked_edges: - found.update(e.id for e in linked_edges) network.add_connections(edge, linked_edges) todo.extend(linked_edges) network = Network() - found = set([edge.id]) + done: set[int] = set() todo: list[Edge] = [edge] watchdog = Watchdog(timeout) @@ -459,6 +513,7 @@ def process(vertex: Vec3) -> None: if watchdog.has_timed_out: raise TimeoutError("build process has timed out") edge = todo.pop() + done.add(edge.id) process(edge.start) process(edge.end) @@ -466,10 +521,16 @@ def process(vertex: Vec3) -> None: class Network: + """The all edges in a network are reachable from every other edge.""" + def __init__(self) -> None: self._edges: dict[int, Edge] = {} self._connections: dict[int, set[int]] = defaultdict(set) + def __repr__(self) -> str: + content = ",".join(str(e) for e in self) + return f"Network([{content}])" + def __len__(self) -> int: return len(self._edges) @@ -480,6 +541,8 @@ def __contains__(self, edge: Edge) -> bool: return edge.id in self._edges def add_connection(self, base: Edge, target: Edge) -> None: + if base.id == target.id: + return self._edges[base.id] = base self._edges[target.id] = target self._connections[base.id].add(target.id) @@ -491,3 +554,83 @@ def add_connections(self, base: Edge, targets: Iterable[Edge]) -> None: def edges_linked_to(self, edge: Edge) -> Sequence[Edge]: return tuple(self._edges[eid] for eid in self._connections[edge.id]) + +class LoopFinderNet: + def __init__( + self, network: Network, discard_reverse=True, gap_tol=GAP_TOL, timeout=TIMEOUT + ) -> None: + if len(network) < 2: + raise ValueError("two or more network nodes required") + self._network = network + self._discard_reverse_solutions = discard_reverse + self._gap_tol = gap_tol + self._timeout = timeout + self._solutions: SearchSolutions = {} + + def __iter__(self) -> Iterator[Sequence[Edge]]: + return iter(self._solutions.values()) + + def __len__(self) -> int: + return len(self._solutions) + + def find_any_loop(self, start: Edge | None = None) -> Sequence[Edge]: + """Returns the first loop found beginning with the given start edge or an + arbitrary edge if `start` is None. + """ + if start is None: + start = next(iter(self._network)) + + self.search(start, stop_at_first_loop=True) + try: + return next(iter(self._solutions.values())) + except StopIteration: + return tuple() + + def search(self, start: Edge, stop_at_first_loop: bool = False) -> None: + """Searches for all loops that begin at the given start edge. + + These are not all possible loops in a network! + """ + if start not in self._network: + raise ValueError("start edge not in network") + network = self._network + gap_tol = self._gap_tol + solutions = self._solutions + + watchdog = Watchdog(self._timeout) + todo: list[Loop] = [Loop((start,))] # "unlimited" recursion stack + while todo: + if watchdog.has_timed_out: + raise TimeoutError("search process has timed out") + loop = todo.pop() + linked_edges = network.edges_linked_to(loop.edges[-1]) + # edges must be unique in a loop + for edge in set(linked_edges) - set(loop.edges): + extended_loop: Loop | None = None + if loop.is_connected(edge): + extended_loop = Loop(loop.edges + (edge,)) + else: + reversed_edge = edge.reversed() + if loop.is_connected(reversed_edge): + extended_loop = Loop(loop.edges + (reversed_edge,)) + if extended_loop is None: + continue + if extended_loop.is_closed_loop(gap_tol): + add_search_solution( + solutions, extended_loop, self._discard_reverse_solutions + ) + if stop_at_first_loop: + return + else: + todo.append(extended_loop) + + +def add_search_solution( + solutions: SearchSolutions, loop: Loop, discard_reversed_loops: bool +) -> None: + key = loop.key() + if key in solutions: + return + if discard_reversed_loops and loop.key(reverse=True) in solutions: + return + solutions[key] = loop.edges diff --git a/tests/test_05_tools/test_546_edgeminer.py b/tests/test_05_tools/test_546_edgeminer.py index 5f716fdcd..2b343ba08 100644 --- a/tests/test_05_tools/test_546_edgeminer.py +++ b/tests/test_05_tools/test_546_edgeminer.py @@ -35,6 +35,17 @@ def test_reversed_copy(self): assert edge.reverse is (not clone.reverse) assert edge.payload is clone.payload + def test_edge_can_be_used_in_sets(self): + A = em.Edge((0, 0), (1, 0)) + B = em.Edge((1, 0), (1, 1)) + C = em.Edge((1, 1), (0, 1)) + + s1 = set([A, B]) + s2 = set([C, B]) + result = s1.intersection(s2) + assert len(result) == 1 + assert B in result + def test_filter_short_edges(): A = em.Edge((0, 0), (0, 0)) @@ -103,7 +114,7 @@ class SimpleLoops: G = em.Edge((2, 1), (1, 1), payload="G") -class TestLoopFinderSimple(SimpleLoops): +class TestLoopFinderRBTSimple(SimpleLoops): def test_unique_available_edges_required(self): finder = em.LoopFinderRBT() with pytest.raises(em.DuplicateEdgesError): @@ -151,6 +162,57 @@ def test_stop_at_first_solution(self): assert len(solutions) == 1 +class TestLoopFinderNetSimple(SimpleLoops): + + @pytest.fixture(scope="class") + def netAD(self): + deposit = em.EdgeDeposit([self.A, self.B, self.C, self.D]) + return deposit.build_network(self.A) + + @pytest.fixture(scope="class") + def netAG(self): + deposit = em.EdgeDeposit( + [self.A, self.B, self.C, self.D, self.E, self.F, self.G] + ) + return deposit.build_network(self.A) + + def test_find_any_loop(self, netAG): + finder = em.LoopFinderNet(netAG) + loop = finder.find_any_loop(start=self.A) + assert len(loop) > 3 + + def test_loop_A_B_C_D(self, netAD): + finder = em.LoopFinderNet(netAD) + finder.search(self.A) + solutions = list(finder) + assert len(solutions) == 1 + assert collect_payload(solutions[0]) == "A,B,C,D" + + def test_loop_D_A_B_C(self, netAD): + finder = em.LoopFinderNet(netAD) + finder.search(self.D) + solutions = list(finder) + assert len(solutions) == 1 + assert collect_payload(solutions[0]) == "D,A,B,C" + + def test_loop_A_to_D_unique_solutions(self, netAD): + finder = em.LoopFinderNet(netAD) + finder.search(self.A) + # rotated edges, same loop + finder.search(self.D) + solutions = list(finder) + assert len(solutions) == 1 + + def test_loops_A_to_G(self, netAG): + finder = em.LoopFinderNet(netAG, timeout=10) + finder.search(self.A) + solutions = list(finder) + assert len(solutions) == 2 + expected = {"A,B,C,D", "A,E,F,G,C,D"} + assert collect_payload(solutions[0]) in expected + assert collect_payload(solutions[1]) in expected + + class TestAPIFunction(SimpleLoops): def test_find_all_loop(self): solutions = em.find_all_loops( @@ -271,10 +333,29 @@ def test_build_network_A_G(self): # network of all edges connected directly or indirectly to B network = deposit.build_network(self.B) assert len(network) == 7 - # all edges connected directly to B + + # all edges connected directly to A + edges = network.edges_linked_to(self.A) + assert len(edges) == 3 + edges = network.edges_linked_to(self.B) assert len(edges) == 4 + edges = network.edges_linked_to(self.C) + assert len(edges) == 3 + + edges = network.edges_linked_to(self.D) + assert len(edges) == 2 + + edges = network.edges_linked_to(self.E) + assert len(edges) == 3 + + edges = network.edges_linked_to(self.F) + assert len(edges) == 2 + + edges = network.edges_linked_to(self.G) + assert len(edges) == 3 + if __name__ == "__main__": pytest.main([__file__])