Skip to content

Commit

Permalink
replace LoopFinderRBT by network based LoopFinder
Browse files Browse the repository at this point in the history
  • Loading branch information
mozman committed May 26, 2024
1 parent 6a684d2 commit 5ce7f67
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 201 deletions.
184 changes: 41 additions & 143 deletions src/ezdxf/edgeminer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def find_shortest_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]
solutions = sorted(find_all_loops(edges, gap_tol=gap_tol), key=length)
if solutions:
return solutions[0]
return []
return tuple()


def find_longest_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
Expand All @@ -164,83 +164,52 @@ def find_longest_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
solutions = sorted(find_all_loops(edges, gap_tol=gap_tol), key=length)
if solutions:
return solutions[-1]
return []
return tuple()


def find_first_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
"""Returns the first closed loop found.
Note: Recursive backtracking algorithm with time complexity of O(n!).
def find_first_loop(
edges: Sequence[Edge], gap_tol=GAP_TOL, timeout=TIMEOUT
) -> Sequence[Edge]:
"""Returns the first closed loop found in `edges`.
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
"""
finder = LoopFinderRBT(first=True, gap_tol=gap_tol)
available = tuple(type_check(edges))
if len(available) < 2:
return []
finder.search(available[0], available[1:])
solutions = list(finder)
if solutions:
return solutions[0]
return []
timeout: timeout in seconds
Raises:
TimeoutError: search process has timed out
TypeError: invalid data in sequence `edges`
"""
deposit = EdgeDeposit(edges, gap_tol=gap_tol)
if len(deposit.edges) < 2:
return tuple()
return find_first_loop_in_deposit(deposit, timeout=timeout)

def find_first_loop_net(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
"""Returns the first closed loop found.

This function is based on networks of connected edges.
def find_first_loop_in_deposit(deposit: EdgeDeposit, timeout=TIMEOUT) -> Sequence[Edge]:
"""Returns the first closed loop found in edge `deposit`.
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
deposit: edge deposit
timeout: timeout in seconds
Raises:
TimeoutError: search process has timed out
"""

edges = type_check(edges)
if len(edges) < 2:
return []
deposit = EdgeDeposit(edges, gap_tol=gap_tol)
networks: list[Network] = []
available = set(edges)
while available:
network = deposit.build_network(available.pop())
if len(network):
networks.append(network)
available -= set(network)

networks.sort(key=lambda n: len(n))
if len(deposit.edges) < 2:
return tuple()
networks = list(deposit.build_all_networks(timeout=timeout))
for network in networks:
finder = LoopFinderNet(network)
finder = LoopFinder(network, timeout=timeout)
loop = finder.find_any_loop()
if loop:
return loop
return []
return tuple()


def find_all_loops(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Sequence[Edge]]:
"""Returns all unique closed loops and doesn't include reversed solutions.
Note: Recursive backtracking algorithm with time complexity of O(n!).
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
"""
finder = LoopFinderRBT(discard_reverse=True, gap_tol=gap_tol)
_edges = list(type_check(edges))
for _ in range(len(edges)):
available = tuple(_edges)
finder.search(available[0], available[1:])
# Rotate the edges and start the search again to get an exhaustive result.
first = _edges.pop(0)
_edges.append(first)
# It's not required to search for disconnected loops - by rotating and restarting,
# every possible loop is taken into account.
return tuple(finder)


def find_all_loops_net(
def find_all_loops(
edges: Sequence[Edge], gap_tol=GAP_TOL, timeout=TIMEOUT
) -> Sequence[Sequence[Edge]]:
"""Returns all unique closed loops and doesn't include reversed solutions.
Expand All @@ -265,8 +234,8 @@ def find_all_loops_net(
def find_all_loops_in_deposit(
deposit: EdgeDeposit, timeout=TIMEOUT
) -> Sequence[Sequence[Edge]]:
"""Returns all unique closed loops in found in the edge deposit and doesn't include
reversed solutions.
"""Returns all unique closed loops in found in the edge `deposit` and doesn't
include reversed solutions.
Note: Recursive backtracking algorithm with time complexity of O(n!).
Expand All @@ -280,7 +249,7 @@ def find_all_loops_in_deposit(
gap_tol = deposit.gap_tol
solutions: list[Sequence[Edge]] = []
for network in deposit.build_all_networks(timeout=timeout):
finder = LoopFinderNet(network, gap_tol=gap_tol, timeout=timeout)
finder = LoopFinder(network, gap_tol=gap_tol, timeout=timeout)
for edge in network:
finder.search(edge)
solutions.extend(finder)
Expand Down Expand Up @@ -360,87 +329,13 @@ def key(self, reverse=False) -> tuple[int, ...]:
ids = ids[index:] + ids[:index]
return ids

def ordered(self, reverse=False) -> Iterator[Edge]:
"""Returns the loop edges in key order."""
edges = {e.id: e for e in self.edges}
return (edges[eid] for eid in self.key(reverse))

SearchSolutions: TypeAlias = Dict[Tuple[int, ...], Tuple[Edge, ...]]


class LoopFinderRBT:
"""Recursive backtracking algorithm to find closed loops with time complexity of O(n!).
Args:
first: flag to stop the search at the first loop found
discard_reverse: discard loops that are identical to found loops but in reverse order
gap_tol: maximum vertex distance to consider two edges as connected
"""

def __init__(self, first=False, discard_reverse=True, gap_tol=GAP_TOL) -> None:
self._solutions: SearchSolutions = {}
self._stop_at_first_solution = first
self._discard_reverse_solutions = discard_reverse
self._gap_tol = gap_tol
self.watchdog = Watchdog()

def __iter__(self) -> Iterator[tuple[Edge, ...]]:
"""Yields all loops found as sequences of edges."""
return iter(self._solutions.values())

def __len__(self) -> int:
"""Returns the count of loops found."""
return len(self._solutions)

def search(self, start: Edge, available: Sequence[Edge], timeout=TIMEOUT):
"""Searches for closed loops in the available edges, starting from the given
start edge.
The starting edge cannot exist in the available edges and the
avalibale edges cannot have duplicate edges.
Args:
start: staring edge
available: available edges
Raises:
DuplicateEdgesError: duplicate edges or starting edge in available edges
TimeoutError: search process has timed out
RecursionError: search exceeded Python's recursion limit
"""
ids = [e.id for e in available]
unique_ids = set(ids)
if len(ids) != len(unique_ids):
raise DuplicateEdgesError("available edges cannot have duplicate edges")
if start.id in unique_ids:
raise DuplicateEdgesError("starting edge cannot exist in available edges")
self.watchdog.start(timeout)
try:
self._search(Loop((start,)), tuple(available))
except StopSearchException:
pass

def _search(self, loop: Loop, available: tuple[Edge, ...]):
for next_edge in available:
if self.watchdog.has_timed_out:
raise TimeoutError("search process has timed out")
edge = next_edge
extended_loop: Loop | None = None
if loop.is_connected(edge, self._gap_tol):
extended_loop = Loop(loop.edges + (edge,))
else:
edge = next_edge.reversed()
if loop.is_connected(edge, self._gap_tol):
extended_loop = Loop(loop.edges + (edge,))

if extended_loop is None:
continue

if extended_loop.is_closed_loop(self._gap_tol):
add_search_solution(
self._solutions, extended_loop, self._discard_reverse_solutions
)
if self._stop_at_first_solution:
raise StopSearchException
else: # depth search, may raise RecursionError
_id = edge.id
self._search(extended_loop, tuple(e for e in available if e.id != _id))
SearchSolutions: TypeAlias = Dict[Tuple[int, ...], Tuple[Edge, ...]]


class EdgeVertexIndex:
Expand Down Expand Up @@ -568,7 +463,8 @@ def process(vertex: Vec3) -> None:
return network

def build_all_networks(self, timeout=TIMEOUT) -> Sequence[Network]:
"""Returns all separated networks in this deposit.
"""Returns all separated networks in this deposit in ascending order of edge
count.
Raises:
TimeoutError: build process has timed out
Expand All @@ -586,6 +482,8 @@ def build_all_networks(self, timeout=TIMEOUT) -> Sequence[Network]:
edges -= set(network)
else: # solitary edge
edges.discard(edge)

networks.sort(key=lambda n: len(n))
return networks


Expand Down Expand Up @@ -625,7 +523,7 @@ def edges_linked_to(self, edge: Edge) -> Sequence[Edge]:
return tuple(self._edges[eid] for eid in self._connections[edge.id])


class LoopFinderNet:
class LoopFinder:
def __init__(
self, network: Network, discard_reverse=True, gap_tol=GAP_TOL, timeout=TIMEOUT
) -> None:
Expand Down
75 changes: 17 additions & 58 deletions tests/test_05_tools/test_546_edgeminer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ def test_key(self):


def collect_payload(edges: Sequence[em.Edge]) -> str:
return ",".join([e.payload for e in edges])
"""Returns the payload as strings in key order.
Key order:
Loop starts with the edge with the smallest id.
"""
loop = em.Loop(edges) # type: ignore
return ",".join([e.payload for e in loop.ordered()])


class SimpleLoops:
Expand All @@ -114,55 +120,7 @@ class SimpleLoops:
G = em.Edge((2, 1), (1, 1), payload="G")


class TestLoopFinderRBTSimple(SimpleLoops):
def test_unique_available_edges_required(self):
finder = em.LoopFinderRBT()
with pytest.raises(em.DuplicateEdgesError):
finder.search(self.A, available=(self.B, self.B, self.B))

def test_start_edge_not_in_available_edges(self):
finder = em.LoopFinderRBT()
with pytest.raises(em.DuplicateEdgesError):
finder.search(self.A, available=(self.A, self.C, self.D))

def test_loop_A_B_C_D(self):
finder = em.LoopFinderRBT()
finder.search(self.A, (self.B, self.C, self.D))
solutions = list(finder)
assert len(solutions) == 1
assert collect_payload(solutions[0]) == "A,B,C,D"

def test_loop_D_A_B_C(self):
finder = em.LoopFinderRBT()
finder.search(self.D, (self.A, self.B, self.C))
solutions = list(finder)
assert len(solutions) == 1
assert collect_payload(solutions[0]) == "D,A,B,C"

def test_loop_A_to_D_unique_solutions(self):
finder = em.LoopFinderRBT()
finder.search(self.A, (self.B, self.C, self.D))
# rotated edges, same loop
finder.search(self.D, (self.A, self.B, self.C))
solutions = list(finder)
assert len(solutions) == 1

def test_loops_A_to_G(self):
finder = em.LoopFinderRBT()
finder.search(self.A, (self.B, self.C, self.D, self.E, self.F, self.G))
solutions = list(finder)
assert len(solutions) == 2
assert collect_payload(solutions[0]) == "A,B,C,D"
assert collect_payload(solutions[1]) == "A,E,F,G,C,D"

def test_stop_at_first_solution(self):
finder = em.LoopFinderRBT(first=True)
finder.search(self.A, (self.B, self.C, self.D, self.E, self.F, self.G))
solutions = list(finder)
assert len(solutions) == 1


class TestLoopFinderNetSimple(SimpleLoops):
class TestLoopFinderSimple(SimpleLoops):

@pytest.fixture(scope="class")
def netAD(self):
Expand All @@ -177,34 +135,34 @@ def netAG(self):
return deposit.build_network(self.A)

def test_find_any_loop(self, netAG):
finder = em.LoopFinderNet(netAG)
finder = em.LoopFinder(netAG)
loop = finder.find_any_loop(start=self.A)
assert len(loop) > 3

def test_loop_A_B_C_D(self, netAD):
finder = em.LoopFinderNet(netAD)
finder = em.LoopFinder(netAD)
finder.search(self.A)
solutions = list(finder)
assert len(solutions) == 1
assert collect_payload(solutions[0]) == "A,B,C,D"

def test_loop_D_A_B_C(self, netAD):
finder = em.LoopFinderNet(netAD)
finder = em.LoopFinder(netAD)
finder.search(self.D)
solutions = list(finder)
assert len(solutions) == 1
assert collect_payload(solutions[0]) == "D,A,B,C"
assert collect_payload(solutions[0]) == "A,B,C,D"

def test_loop_A_to_D_unique_solutions(self, netAD):
finder = em.LoopFinderNet(netAD)
finder = em.LoopFinder(netAD)
finder.search(self.A)
# rotated edges, same loop
finder.search(self.D)
solutions = list(finder)
assert len(solutions) == 1

def test_loops_A_to_G(self, netAG):
finder = em.LoopFinderNet(netAG, timeout=10)
finder = em.LoopFinder(netAG, timeout=10)
finder.search(self.A)
solutions = list(finder)
assert len(solutions) == 2
Expand Down Expand Up @@ -268,6 +226,7 @@ def test_find_all_loops(self):
)
assert len(solutions) == 2
solution_strings = [collect_payload(s) for s in solutions]

assert "A,B,C,D" in solution_strings
assert "E,F,G,H" in solution_strings

Expand All @@ -277,8 +236,8 @@ def test_find_all_shuffled_loops(self):
)
assert len(solutions) == 2
solution_strings = [collect_payload(s) for s in solutions]
assert "B,C,D,A" in solution_strings
assert "H,E,F,G" in solution_strings
assert "A,B,C,D" in solution_strings
assert "E,F,G,H" in solution_strings


class TestEdgeDeposit(SimpleLoops):
Expand Down

0 comments on commit 5ce7f67

Please sign in to comment.