Skip to content

Commit

Permalink
add loop finder based on networks
Browse files Browse the repository at this point in the history
  • Loading branch information
mozman committed May 26, 2024
1 parent 2afecc7 commit 7a8858b
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 26 deletions.
191 changes: 167 additions & 24 deletions src/ezdxf/edgeminer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
"""
from __future__ import annotations
from typing import Any, Sequence, Iterator, Iterable
from typing_extensions import Self
from typing import Any, Sequence, Iterator, Iterable, Dict, Tuple
from typing_extensions import Self, TypeAlias
from collections import defaultdict
import time

from ezdxf.math import UVec, Vec3, distance_point_line_3d
from ezdxf.math import rtree


__all__ = [
"find_all_loops",
"find_first_loop",
Expand Down Expand Up @@ -58,6 +59,8 @@ class Edge:
Therefore, the length of the edge must be specified if the length calculation for
a sequence of edges is to be possible.
This class is immutable by design!
Attributes:
id: unique id as int
start: start vertex as Vec3
Expand Down Expand Up @@ -89,6 +92,17 @@ def __eq__(self, other) -> bool:
return self.id == other.id
return False

def __repr__(self) -> str:
if self.payload is None:
content = str(self.id)
else:
content = str(self.payload)
return f"Edge({content})"

def __hash__(self) -> int:
# edges can be used in sets and set-operations
return self.id

def reversed(self) -> Self:
"""Returns a reversed copy."""
edge = self.__class__(self.end, self.start, self.length, self.payload)
Expand All @@ -97,6 +111,13 @@ def reversed(self) -> Self:
return edge


def isclose(a: Vec3, b: Vec3, gap_tol: float = GAP_TOL) -> bool:
"""This function should be used to test whether two vertices are close to each other
to get consistent results.
"""
return a.distance(b) < gap_tol


class Watchdog:
def __init__(self, timeout=TIMEOUT) -> None:
self.timeout: float = timeout
Expand Down Expand Up @@ -166,6 +187,37 @@ def find_first_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
return []


def find_first_loop_net(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
"""Returns the first closed loop found.
This function is based on networks of connected edges.
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
"""

edges = type_check(edges)
if len(edges) < 2:
return []
deposit = EdgeDeposit(edges, gap_tol=gap_tol)
networks: list[Network] = []
available = set(edges)
while available:
network = deposit.build_network(available.pop())
if len(network):
networks.append(network)
available -= set(network)

networks.sort(key=lambda n: len(n))
for network in networks:
finder = LoopFinderNet(network)
loop = finder.find_any_loop()
if loop:
return loop
return []


def find_all_loops(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Sequence[Edge]]:
"""Returns all unique closed loops and doesn't include reversed solutions.
Expand Down Expand Up @@ -201,7 +253,7 @@ def filter_short_edges(edges: Iterable[Edge], gap_tol=GAP_TOL) -> tuple[Edge, ..
These edges represent very short curves or maybe closed curves like circles and
ellipses.
"""
return tuple(e for e in edges if e.start.distance(e.end) >= gap_tol)
return tuple(e for e in edges if not isclose(e.start, e.end, gap_tol))


class Loop:
Expand All @@ -216,6 +268,13 @@ class Loop:
def __init__(self, edges: tuple[Edge, ...]) -> None:
self.edges: tuple[Edge, ...] = edges

def __repr__(self) -> str:
content = ",".join(str(e) for e in self.edges)
return f"Loop([{content}])"

def __len__(self) -> int:
return len(self.edges)

def is_connected(self, edge: Edge, gap_tol=GAP_TOL) -> bool:
"""Returns ``True`` if the last edge of the loop is connected to the given edge.
Expand All @@ -224,7 +283,7 @@ def is_connected(self, edge: Edge, gap_tol=GAP_TOL) -> bool:
gap_tol: maximum vertex distance to consider two edges as connected
"""
if self.edges:
return self.edges[-1].end.distance(edge.start) < gap_tol
return isclose(self.edges[-1].end, edge.start, gap_tol)
return False

def is_closed_loop(self, gap_tol=GAP_TOL) -> bool:
Expand All @@ -235,7 +294,7 @@ def is_closed_loop(self, gap_tol=GAP_TOL) -> bool:
"""

if len(self.edges) > 1:
return self.edges[0].start.distance(self.edges[-1].end) < gap_tol
return isclose(self.edges[0].start, self.edges[-1].end, gap_tol)
return False

def key(self, reverse=False) -> tuple[int, ...]:
Expand All @@ -255,6 +314,9 @@ def key(self, reverse=False) -> tuple[int, ...]:
return ids


SearchSolutions: TypeAlias = Dict[Tuple[int, ...], Tuple[Edge, ...]]


class LoopFinderRBT:
"""Recursive backtracking algorithm to find closed loops with time complexity of O(n!).
Expand All @@ -265,7 +327,7 @@ class LoopFinderRBT:
"""

def __init__(self, first=False, discard_reverse=True, gap_tol=GAP_TOL) -> None:
self._solutions: dict[tuple[int, ...], tuple[Edge, ...]] = {}
self._solutions: SearchSolutions = {}
self._stop_at_first_solution = first
self._discard_reverse_solutions = discard_reverse
self._gap_tol = gap_tol
Expand Down Expand Up @@ -310,7 +372,7 @@ def search(self, start: Edge, available: Sequence[Edge], timeout=TIMEOUT):
def _search(self, loop: Loop, available: tuple[Edge, ...]):
for next_edge in available:
if self.watchdog.has_timed_out:
raise TimeoutError("search process has timed out")
raise TimeoutError("search process has timed out")
edge = next_edge
extended_loop: Loop | None = None
if loop.is_connected(edge, self._gap_tol):
Expand All @@ -324,24 +386,15 @@ def _search(self, loop: Loop, available: tuple[Edge, ...]):
continue

if extended_loop.is_closed_loop(self._gap_tol):
self._append_solution(extended_loop)
add_search_solution(
self._solutions, extended_loop, self._discard_reverse_solutions
)
if self._stop_at_first_solution:
raise StopSearchException
else: # depth search, may raise RecursionError
_id = edge.id
self._search(extended_loop, tuple(e for e in available if e.id != _id))

def _append_solution(self, loop: Loop) -> None:
key = loop.key()
if key in self._solutions:
return
if (
self._discard_reverse_solutions
and loop.key(reverse=True) in self._solutions
):
return
self._solutions[key] = loop.edges


class EdgeVertexIndex:
"""Index of edges referenced by the id of their start- and end vertices.
Expand Down Expand Up @@ -370,7 +423,7 @@ def find_edges(self, vertices: Iterable[Vec3]) -> Sequence[Edge]:
return edges


class SearchIndex:
class SpatialSearchIndex:
"""Spatial search index of all edge vertices."""

def __init__(self, edges: Sequence[Edge]) -> None:
Expand Down Expand Up @@ -399,10 +452,12 @@ def discard_ids(edges: Iterable[Edge], ids: set[int]) -> Sequence[Edge]:


class EdgeDeposit:
"""The edge deposit stores all available edges for further searches."""

def __init__(self, edges: Sequence[Edge], gap_tol=GAP_TOL) -> None:
self.gap_tol = gap_tol
self.edge_index = EdgeVertexIndex(edges)
self.search_index = SearchIndex(edges)
self.search_index = SpatialSearchIndex(edges)

def edges_linked_to(self, vertex: UVec, radius: float = -1) -> Sequence[Edge]:
"""Returns all edges linked to `vertex` in range of `radius`.
Expand Down Expand Up @@ -444,32 +499,38 @@ def build_network(self, edge: Edge, timeout=TIMEOUT) -> Network:

def process(vertex: Vec3) -> None:
linked_edges = self.edges_linked_to(vertex)
linked_edges = discard_ids(linked_edges, ids=found)
linked_edges = discard_ids(linked_edges, ids=done)
if linked_edges:
found.update(e.id for e in linked_edges)
network.add_connections(edge, linked_edges)
todo.extend(linked_edges)

network = Network()
found = set([edge.id])
done: set[int] = set()
todo: list[Edge] = [edge]
watchdog = Watchdog(timeout)

while todo:
if watchdog.has_timed_out:
raise TimeoutError("build process has timed out")
edge = todo.pop()
done.add(edge.id)
process(edge.start)
process(edge.end)

return network


class Network:
"""The all edges in a network are reachable from every other edge."""

def __init__(self) -> None:
self._edges: dict[int, Edge] = {}
self._connections: dict[int, set[int]] = defaultdict(set)

def __repr__(self) -> str:
content = ",".join(str(e) for e in self)
return f"Network([{content}])"

def __len__(self) -> int:
return len(self._edges)

Expand All @@ -480,6 +541,8 @@ def __contains__(self, edge: Edge) -> bool:
return edge.id in self._edges

def add_connection(self, base: Edge, target: Edge) -> None:
if base.id == target.id:
return
self._edges[base.id] = base
self._edges[target.id] = target
self._connections[base.id].add(target.id)
Expand All @@ -491,3 +554,83 @@ def add_connections(self, base: Edge, targets: Iterable[Edge]) -> None:

def edges_linked_to(self, edge: Edge) -> Sequence[Edge]:
return tuple(self._edges[eid] for eid in self._connections[edge.id])

class LoopFinderNet:
def __init__(
self, network: Network, discard_reverse=True, gap_tol=GAP_TOL, timeout=TIMEOUT
) -> None:
if len(network) < 2:
raise ValueError("two or more network nodes required")
self._network = network
self._discard_reverse_solutions = discard_reverse
self._gap_tol = gap_tol
self._timeout = timeout
self._solutions: SearchSolutions = {}

def __iter__(self) -> Iterator[Sequence[Edge]]:
return iter(self._solutions.values())

def __len__(self) -> int:
return len(self._solutions)

def find_any_loop(self, start: Edge | None = None) -> Sequence[Edge]:
"""Returns the first loop found beginning with the given start edge or an
arbitrary edge if `start` is None.
"""
if start is None:
start = next(iter(self._network))

self.search(start, stop_at_first_loop=True)
try:
return next(iter(self._solutions.values()))
except StopIteration:
return tuple()

def search(self, start: Edge, stop_at_first_loop: bool = False) -> None:
"""Searches for all loops that begin at the given start edge.
These are not all possible loops in a network!
"""
if start not in self._network:
raise ValueError("start edge not in network")
network = self._network
gap_tol = self._gap_tol
solutions = self._solutions

watchdog = Watchdog(self._timeout)
todo: list[Loop] = [Loop((start,))] # "unlimited" recursion stack
while todo:
if watchdog.has_timed_out:
raise TimeoutError("search process has timed out")
loop = todo.pop()
linked_edges = network.edges_linked_to(loop.edges[-1])
# edges must be unique in a loop
for edge in set(linked_edges) - set(loop.edges):
extended_loop: Loop | None = None
if loop.is_connected(edge):
extended_loop = Loop(loop.edges + (edge,))
else:
reversed_edge = edge.reversed()
if loop.is_connected(reversed_edge):
extended_loop = Loop(loop.edges + (reversed_edge,))
if extended_loop is None:
continue
if extended_loop.is_closed_loop(gap_tol):
add_search_solution(
solutions, extended_loop, self._discard_reverse_solutions
)
if stop_at_first_loop:
return
else:
todo.append(extended_loop)


def add_search_solution(
solutions: SearchSolutions, loop: Loop, discard_reversed_loops: bool
) -> None:
key = loop.key()
if key in solutions:
return
if discard_reversed_loops and loop.key(reverse=True) in solutions:
return
solutions[key] = loop.edges
Loading

0 comments on commit 7a8858b

Please sign in to comment.