Skip to content

Commit

Permalink
add sequential_search_all() function
Browse files Browse the repository at this point in the history
  • Loading branch information
mozman committed May 27, 2024
1 parent b8ed81a commit 1aa861f
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 79 deletions.
199 changes: 132 additions & 67 deletions src/ezdxf/edgeminer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
from __future__ import annotations
from typing import Any, Sequence, Iterator, Iterable, Dict, Tuple, Callable
from typing import Any, Sequence, Iterator, Iterable, Dict, Tuple
from typing_extensions import Self, TypeAlias
from collections import defaultdict
import time
Expand All @@ -23,14 +23,22 @@


__all__ = [
"Edge",
"EdgeDeposit",
"find_all_loops_in_deposit",
"find_all_loops",
"find_first_loop_in_deposit",
"find_first_loop",
"find_shortest_loop",
"find_longest_loop",
"sequential_search",
"is_backwards_connected",
"is_chain",
"is_forward_connected",
"is_loop",
"length",
"filter_short_edges",
"Edge",
"longest_chain",
"sequential_search_all",
"sequential_search",
"shortest_chain",
"TimeoutError",
]
ABS_TOL = 1e-12
GAP_TOL = 1e-12
Expand All @@ -45,13 +53,6 @@ class TimeoutError(EdgeMinerException):
pass


class DuplicateEdgesError(EdgeMinerException):
pass


class StopSearchException(EdgeMinerException):
pass


class Edge:
"""Represents an edge.
Expand Down Expand Up @@ -116,33 +117,109 @@ def isclose(a: Vec3, b: Vec3, gap_tol=GAP_TOL) -> bool:
"""This function should be used to test whether two vertices are close to each other
to get consistent results.
"""
return a.distance(b) < gap_tol
return a.distance(b) <= gap_tol


def is_forward_connected(a: Edge, b: Edge, gap_tol=GAP_TOL) -> bool:
"""Returns ``True`` if the edges have a forward connection."""
"""Returns ``True`` if the edges have a forward connection.
Forward connection: a.end is connected to b.start
Args:
a: first edge
b: second edge
gap_tol: maximum vertex distance to consider two edges as connected
"""
return isclose(a.end, b.start, gap_tol)


def is_backwards_connected(a: Edge, b: Edge, gap_tol=GAP_TOL) -> bool:
"""Returns ``True`` if the edges have a backward connection."""
"""Returns ``True`` if the edges have a backward connection.
Backwards connection: a.start is connected to b.end
Args:
a: first edge
b: second edge
gap_tol: maximum vertex distance to consider two edges as connected
"""
return isclose(a.start, b.end, gap_tol)


def is_chain(edges: Sequence[Edge], gap_tol=GAP_TOL) -> bool:
"""Returns ``True`` if all edges are connected forward.
Forward connection: edge[n].end is connected to edge[n+1].start
Args:
edges: sequence of edges
gap_tol: maximum vertex distance to consider two edges as connected
"""
return all(is_forward_connected(a, b, gap_tol) for a, b in zip(edges, edges[1:]))


def is_loop(edges: Sequence[Edge], gap_tol=GAP_TOL, full=True) -> bool:
if full:
if not all(
is_forward_connected(a, b, gap_tol) for a, b in zip(edges, edges[1:])
):
return False
"""Return ``True`` if the sequence of edges is a closed forward loop.
Forward connection: edge[n].end is connected to edge[n+1].start
Args:
edges: sequence of edges
gap_tol: maximum vertex distance to consider two edges as connected
full: does a full check if all edges are connected if ``True``, otherwise checks
only if the last edge is connected to the first edge.
"""
if full and not is_chain(edges, gap_tol):
return False
return is_forward_connected(edges[-1], edges[0])


def length(edges: Sequence[Edge]) -> float:
"""Returns the length of a sequence of edges."""
return sum(e.length for e in edges)


def shortest_chain(chains: Iterable[Sequence[Edge]]) -> Sequence[Edge]:
"""Returns the shortest chain of connected edges.
.. Note::
This function does not verify if the input sequences are connected edges!
"""
sorted_chains = sorted(chains, key=length)
if sorted_chains:
return sorted_chains[0]
return tuple()


def longest_chain(chains: Iterable[Sequence[Edge]]) -> Sequence[Edge]:
"""Returns the longest chain of connected edges.
.. Note::
This function does not verify if the input sequences are connected edges!
"""
sorted_chains = sorted(chains, key=length)
if sorted_chains:
return sorted_chains[-1]
return tuple()


def sequential_search(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
"""Returns all consecutive connected edges starting from the first edge.
The search stops at the first edge without a connection.
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
Raises:
TypeError: invalid data in sequence `edges`
"""
edges = type_check(edges)
if len(edges) < 2:
return edges
chain = [edges[0]]
Expand All @@ -159,6 +236,26 @@ def sequential_search(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
return chain


def sequential_search_all(
edges: Sequence[Edge], gap_tol=GAP_TOL
) -> Iterator[Sequence[Edge]]:
"""Yields all edge strings with consecutive connected edges starting from the first
edge. This search starts a new sequence at every edge without a connection to
the previous sequence. Each sequence has one or more edges, yields no empty sequences.
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
Raises:
TypeError: invalid data in sequence `edges`
"""
while edges:
chain = sequential_search(edges, gap_tol)
edges = edges[len(chain) :]
yield chain


class Watchdog:
def __init__(self, timeout=TIMEOUT) -> None:
self.timeout: float = timeout
Expand All @@ -173,46 +270,15 @@ def has_timed_out(self) -> bool:
return time.perf_counter() - self.start_time > self.timeout


def length(edges: Sequence[Edge]) -> float:
"""Returns the length of a sequence of edges."""
return sum(e.length for e in edges)


def find_shortest_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
"""Returns the shortest closed loop found.
Note: Recursive backtracking algorithm with time complexity of O(n!).
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
"""
solutions = sorted(find_all_loops(edges, gap_tol=gap_tol), key=length)
if solutions:
return solutions[0]
return tuple()


def find_longest_loop(edges: Sequence[Edge], gap_tol=GAP_TOL) -> Sequence[Edge]:
"""Returns the longest closed loop found.
Note: Recursive backtracking algorithm with time complexity of O(n!).
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
"""
solutions = sorted(find_all_loops(edges, gap_tol=gap_tol), key=length)
if solutions:
return solutions[-1]
return tuple()


def find_first_loop(
edges: Sequence[Edge], gap_tol=GAP_TOL, timeout=TIMEOUT
) -> Sequence[Edge]:
"""Returns the first closed loop found in `edges`.
.. Note::
Recursive backtracking algorithm with time complexity of O(n!).
Args:
edges: edges to be examined
gap_tol: maximum vertex distance to consider two edges as connected
Expand All @@ -231,6 +297,10 @@ def find_first_loop(
def find_first_loop_in_deposit(deposit: EdgeDeposit, timeout=TIMEOUT) -> Sequence[Edge]:
"""Returns the first closed loop found in edge `deposit`.
.. Note::
Recursive backtracking algorithm with time complexity of O(n!).
Args:
deposit: edge deposit
timeout: timeout in seconds
Expand All @@ -255,7 +325,9 @@ def find_all_loops(
) -> Sequence[Sequence[Edge]]:
"""Returns all unique closed loops and doesn't include reversed solutions.
Note: Recursive backtracking algorithm with time complexity of O(n!).
.. Note::
Recursive backtracking algorithm with time complexity of O(n!).
Args:
edges: edges to be examined
Expand All @@ -278,7 +350,9 @@ def find_all_loops_in_deposit(
"""Returns all unique closed loops in found in the edge `deposit` and doesn't
include reversed solutions.
Note: Recursive backtracking algorithm with time complexity of O(n!).
.. Note::
Recursive backtracking algorithm with time complexity of O(n!).
Args:
deposit: edge deposit
Expand All @@ -304,15 +378,6 @@ def type_check(edges: Sequence[Edge]) -> Sequence[Edge]:
return edges


def filter_short_edges(edges: Iterable[Edge], gap_tol=GAP_TOL) -> tuple[Edge, ...]:
"""Removes all edges where the start vertex is very close to the end vertex.
These edges represent very short curves or maybe closed curves like circles and
ellipses.
"""
return tuple(e for e in edges if not isclose(e.start, e.end, gap_tol))


class Loop:
"""Represents connected edges.
Expand Down
48 changes: 36 additions & 12 deletions tests/test_05_tools/test_546_edgeminer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ def test_edge_can_be_used_in_sets(self):
assert B in result


def test_filter_short_edges():
A = em.Edge((0, 0), (0, 0))
B = em.Edge((1, 0), (1, 1))
result = em.filter_short_edges([A, B])
assert len(result) == 1
assert result[0] is B


class TestLoop:
# +-C-+
# | |
Expand Down Expand Up @@ -100,6 +92,10 @@ def collect_payload(edges: Sequence[em.Edge]) -> str:
Key order:
Loop starts with the edge with the smallest id.
"""
if len(edges) == 0:
return ""
elif len(edges) == 1:
return edges[0].payload # type: ignore
loop = em.Loop(edges) # type: ignore
return ",".join([e.payload for e in loop.ordered()])

Expand Down Expand Up @@ -136,6 +132,34 @@ def test_sequential_forward_search(self):
assert result[-1] is self.F


def test_sequential_search_all():
# 0 1 2 3
# 1 +-C-+-I-+-G-+
# | | | |
# D B H F
# | | | |
# 0 +-A-+-J-+-E-+

A = em.Edge((0, 0), (1, 0), payload="A")
B = em.Edge((1, 0), (1, 1), payload="B")
C = em.Edge((1, 1), (0, 1), payload="C")
D = em.Edge((0, 1), (0, 0), payload="D")
E = em.Edge((2, 0), (3, 0), payload="E")
F = em.Edge((3, 0), (3, 1), payload="F")
G = em.Edge((3, 1), (2, 1), payload="G")
H = em.Edge((2, 1), (2, 0), payload="H")
I = em.Edge((1, 1), (2, 1), payload="I")
J = em.Edge((1, 0), (2, 0), payload="J")

edges = [A, B, C, D, E, F, G, H, I, J]
result = list(em.sequential_search_all(edges))
assert len(result) == 4
assert collect_payload(result[0]) == "A,B,C,D"
assert collect_payload(result[1]) == "E,F,G,H"
assert collect_payload(result[2]) == "I"
assert collect_payload(result[3]) == "J"


class SimpleLoops:

# 0 1 2
Expand Down Expand Up @@ -224,15 +248,15 @@ def test_find_first_loop(self):
assert len(solution) >= 4 # any loop is a valid solution

def test_find_shortest_loop(self):
solution = em.find_shortest_loop(
(self.A, self.B, self.C, self.D, self.E, self.F, self.G)
solution = em.shortest_chain(
em.find_all_loops((self.A, self.B, self.C, self.D, self.E, self.F, self.G))
)
assert len(solution) == 4
assert collect_payload(solution) == "A,B,C,D"

def test_find_longest_loop(self):
solution = em.find_longest_loop(
(self.A, self.B, self.C, self.D, self.E, self.F, self.G)
solution = em.longest_chain(
em.find_all_loops((self.A, self.B, self.C, self.D, self.E, self.F, self.G))
)
assert len(solution) == 6
assert collect_payload(solution) == "A,E,F,G,C,D"
Expand Down

0 comments on commit 1aa861f

Please sign in to comment.