diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index 24fc2d1..f23db0a 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -30,5 +30,7 @@ jobs: docker compose up hacking_lab_app & docker compose run my_portscanner_app -sS 172.18.0.3 -p 22 -d docker compose run my_portscanner_app -sS 172.18.0.3 -p 22,80 --max-parallelism 16 --max-rtt-timeout 200 -d + docker compose run my_portscanner_app -sS 172.18.0.3 -p20-30 --max-rtt-timeout 100 -d -Pn docker compose run my_portscanner_app -sT 172.18.0.3 -p20-30 --max-rtt-timeout 100 -d + docker compose run my_portscanner_app -sT 172.18.0.3 -p20-30 --max-rtt-timeout 100 -d -Pn diff --git a/src/my_portscanner/__init__.py b/src/my_portscanner/__init__.py index bbf96d7..8573e89 100644 --- a/src/my_portscanner/__init__.py +++ b/src/my_portscanner/__init__.py @@ -40,6 +40,7 @@ def main(): target_port_list=args["port"], max_rtt_timeout=args["max_rtt_timeout"], max_parallelism=args["max_parallelism"], + no_ping=args["no_ping"], ) elif args["scan_type"] == "stealth": scan = SynScan( @@ -47,6 +48,7 @@ def main(): target_port_list=args["port"], max_rtt_timeout=args["max_rtt_timeout"], max_parallelism=args["max_parallelism"], + no_ping=args["no_ping"], ) else: print("invalid scan type") diff --git a/src/my_portscanner/options.py b/src/my_portscanner/options.py index 0fc1cf1..8c29a40 100644 --- a/src/my_portscanner/options.py +++ b/src/my_portscanner/options.py @@ -54,6 +54,12 @@ def parse_args() -> dict: action="store_true", help="display debug info", ) + parser.add_argument( + "-Pn", + "--no-ping", + action="store_true", + help="no ping sent before scanning", + ) p = parser.parse_args() try: @@ -74,6 +80,7 @@ def parse_args() -> dict: "max_rtt_timeout": p.max_rtt_timeout, "max_parallelism": p.max_parallelism, "debug": p.debug, + "no_ping": p.no_ping, } return args diff --git a/src/my_portscanner/scan_tools/ConnectScan.py b/src/my_portscanner/scan_tools/ConnectScan.py index 42bfc6d..0914276 100644 --- a/src/my_portscanner/scan_tools/ConnectScan.py +++ b/src/my_portscanner/scan_tools/ConnectScan.py @@ -14,6 +14,8 @@ def run(self) -> list[dict]: scan_result: list[dict] e.g: [{"port": port, "state": "open"}, "port": port, "state": "closed"} ...] """ + if not self.no_ping: + self._get_latency() self.scan_result = asyncio.run(self._async_run()) return self.scan_result diff --git a/src/my_portscanner/scan_tools/Scan.py b/src/my_portscanner/scan_tools/Scan.py index c324b4a..4eb930a 100644 --- a/src/my_portscanner/scan_tools/Scan.py +++ b/src/my_portscanner/scan_tools/Scan.py @@ -1,6 +1,8 @@ # coding: utf-8 import asyncio from abc import ABC, abstractmethod +import time +from scapy.all import sr1, IP, ICMP class Scan(ABC): @@ -11,7 +13,14 @@ class Scan(ABC): max_parallelism: int scan_result: list[dict] - def __init__(self, target_ip: str, target_port_list: list[int], max_rtt_timeout: int, max_parallelism: int): + def __init__( + self, + target_ip: str, + target_port_list: list[int], + max_rtt_timeout: int, + max_parallelism: int, + no_ping: bool, + ): """_summary_ constructor Args: @@ -24,6 +33,7 @@ def __init__(self, target_ip: str, target_port_list: list[int], max_rtt_timeout: self.target_port_list = target_port_list self.max_rtt_timeout = max_rtt_timeout self.max_parallelism = max_parallelism + self.no_ping = no_ping self.scan_result = [] def __str__(self) -> str: @@ -34,7 +44,20 @@ def __str__(self) -> str: target_port_list_fmt = "all" else: target_port_list_fmt = self.target_port_list - return f"Scan(target_ip={self.target_ip}, target_port_list={target_port_list_fmt}, scan_type={self.__class__.__name__}, max_rtt_timeout={self.max_rtt_timeout}, max_parallelism={self.max_parallelism})" + return f"Scan(target_ip={self.target_ip}, target_port_list={target_port_list_fmt}, scan_type={self.__class__.__name__}, max_rtt_timeout={self.max_rtt_timeout}, max_parallelism={self.max_parallelism}, Pn={self.no_ping})" + + def _get_latency(self) -> None: + """_summary_ + latency情報を取得するためICMPパケットを送信する + """ + packet = IP(dst=self.target_ip) / ICMP() + start_time = time.time() + response = sr1(packet, timeout=self.max_rtt_timeout, verbose=0) + if response: + latency = time.time() - start_time + print(f"Host is up ({latency}s latency).") + else: + print("Host may be down.") @abstractmethod def run(self) -> list[dict]: @@ -95,7 +118,9 @@ def print_result(self) -> None: # 出力が100行を超えそうなときは,closed portsを非表示にする。 if len(self.scan_result) > 100: self.scan_result = [ - port_info for port_info in self.scan_result if port_info["state"] != "closed" + port_info + for port_info in self.scan_result + if port_info["state"] != "closed" ] # port6桁+/tcpで10桁 print(f"{"PORT":<10} {"STATE":<8} SERVICE") diff --git a/src/my_portscanner/scan_tools/SynScan.py b/src/my_portscanner/scan_tools/SynScan.py index 93d8153..d7eabd5 100644 --- a/src/my_portscanner/scan_tools/SynScan.py +++ b/src/my_portscanner/scan_tools/SynScan.py @@ -13,6 +13,8 @@ def run(self) -> list[dict]: scan_result: list[dict] e.g: [{"port": port, "state": "open"}, {"port": port, "state": "closed"}, ...] """ + if not self.no_ping: + self._get_latency() try: self.scan_result = asyncio.run(self._async_run()) # NOTE: 非同期処理により複数回PermissoinErrorが上がらないようにするため,例外の伝播を行っている。 diff --git a/src/tests/scan_tools/test_ConnectScan.py b/src/tests/scan_tools/test_ConnectScan.py index 7d1cab2..b0f43a2 100644 --- a/src/tests/scan_tools/test_ConnectScan.py +++ b/src/tests/scan_tools/test_ConnectScan.py @@ -39,6 +39,7 @@ def connext_ex_side_effect(target_tuple): target_port_list=self.target_port_list, max_rtt_timeout=self.max_rtt_timeout, max_parallelism=self.max_parallelism, + no_ping=False, ) scan_result = [] for port in self.target_port_list: @@ -62,6 +63,7 @@ def test_print_result(self): target_port_list=self.target_port_list, max_rtt_timeout=self.max_rtt_timeout, max_parallelism=self.max_parallelism, + no_ping=True, ) scan.scan_result = [ {"port": 22, "state": "filtered"}, @@ -99,6 +101,7 @@ def test_print_result_remove_closed_ports_when_scan_result_is_long(self): target_port_list=self.target_port_list, max_rtt_timeout=self.max_rtt_timeout, max_parallelism=self.max_parallelism, + no_ping=True, ) # 100行以上のテストデータを作成 diff --git a/src/tests/scan_tools/test_SynScan.py b/src/tests/scan_tools/test_SynScan.py index 7f981e0..a78bb2b 100644 --- a/src/tests/scan_tools/test_SynScan.py +++ b/src/tests/scan_tools/test_SynScan.py @@ -47,6 +47,7 @@ def test_run(self, mock_sr1): target_port_list=self.target_port_list, max_rtt_timeout=self.max_rtt_timeout, max_parallelism=self.max_parallelism, + no_ping=True, ) mock_sr1.side_effect = self.sr1_side_effect @@ -85,6 +86,7 @@ def test_run_raise_permission_error(self, mock_sr1): target_port_list=self.target_port_list, max_rtt_timeout=self.max_rtt_timeout, max_parallelism=self.max_parallelism, + no_ping=True, ) # NOTE: mock_getuid.return_valueによってuid=0以外にmockしてもうまくPermissionErrorを発生させることができなかったので直接PermissionErrorを発生させる diff --git a/src/tests/test_options.py b/src/tests/test_options.py index b3008ac..0c479f1 100644 --- a/src/tests/test_options.py +++ b/src/tests/test_options.py @@ -21,6 +21,7 @@ def test_parse_args_default(): assert args["max_rtt_timeout"] == 1000 assert args["max_parallelism"] is None assert args["debug"] is False + assert args["no_ping"] is False def test_parse_args_options(): @@ -38,6 +39,7 @@ def test_parse_args_options(): "--max-parallelism", "32", "-d", + "-Pn", ] sys.argv = test_args args = options.parse_args() @@ -47,6 +49,7 @@ def test_parse_args_options(): assert args["max_rtt_timeout"] == 1000 assert args["max_parallelism"] == 32 assert args["debug"] is True + assert args["no_ping"] is True def test_parse_args_port_range():