diff --git a/src/my_portscanner/__init__.py b/src/my_portscanner/__init__.py index 8573e89..85f634a 100644 --- a/src/my_portscanner/__init__.py +++ b/src/my_portscanner/__init__.py @@ -6,6 +6,7 @@ from .get_datetime import get_datetime_now from .scan_tools import ConnectScan from .scan_tools import SynScan +from .scan_tools import UdpScan from .version import __version__ @@ -50,6 +51,14 @@ def main(): max_parallelism=args["max_parallelism"], no_ping=args["no_ping"], ) + elif args["scan_type"] == "udp": + scan = UdpScan( + target_ip=target_ip, + target_port_list=args["port"], + max_rtt_timeout=args["max_rtt_timeout"], + max_parallelism=args["max_parallelism"], + no_ping=args["no_ping"], + ) else: print("invalid scan type") sys.exit(1) diff --git a/src/my_portscanner/options.py b/src/my_portscanner/options.py index 8c29a40..ea986c4 100644 --- a/src/my_portscanner/options.py +++ b/src/my_portscanner/options.py @@ -26,6 +26,12 @@ def parse_args() -> dict: action="store_true", help="TCP SYN scan", ) + parser.add_argument( + "-sU", + "--udp_scan", + action="store_true", + help="UDP scan", + ) parser.add_argument( "-p", "--port", @@ -63,15 +69,19 @@ def parse_args() -> dict: p = parser.parse_args() try: - port_list = _create_port_list(p.port) + scan_type = _select_scan_type(p.connect_scan, p.stealth_scan, p.udp_scan) except ValueError as e: print(e) sys.exit(1) - if p.stealth_scan: - scan_type = "stealth" - else: - scan_type = "connect" + try: + if scan_type == "udp": + port_list = _create_port_list(p.port, is_udp=True) + else: + port_list = _create_port_list(p.port) + except ValueError as e: + print(e) + sys.exit(1) args = { "target_ip": p.target_ip, @@ -85,7 +95,7 @@ def parse_args() -> dict: return args -def _create_port_list(port: Union[str, None]) -> list[int]: +def _create_port_list(port: Union[str, None], is_udp=False) -> list[int]: """_create_port_list. -p オプションの引数をリストに変換する @@ -100,6 +110,8 @@ def _create_port_list(port: Union[str, None]) -> list[int]: return [int(port)] # all port if port == "-": + if is_udp: + return list(range(1, 1024)) return list(range(0, 65536)) # ,区切りをリストに変換 @@ -116,3 +128,31 @@ def _create_port_list(port: Union[str, None]) -> list[int]: raise ValueError("[-p] port range start is larger than end") port_list = [int(x) for x in range(int(port_start_str), int(port_end_str) + 1)] return port_list + + +def _select_scan_type(connect_scan: bool, stealth_scan: bool, udp_scan: bool) -> str: + """_summary_ + 複数のオプションが同時に指定されないようにしつつ,適切なスキャンタイプを返す + Args: + connect_scan: bool + stealth_scan: bool + udp_scan: bool + + Returns: + str: scan_type + """ + # 複数のオプションが指定されていたらエラー + options_count = sum([connect_scan, stealth_scan, udp_scan]) + if options_count > 1: + raise ValueError("[-sT] [-sS] [-sU] options are exclusive.") + + # default options + if options_count == 0: + return "connect" + + if stealth_scan: + return "stealth" + elif udp_scan: + return "udp" + else: + return "connect" diff --git a/src/my_portscanner/scan_tools/Scan.py b/src/my_portscanner/scan_tools/Scan.py index 4eb930a..234f10e 100644 --- a/src/my_portscanner/scan_tools/Scan.py +++ b/src/my_portscanner/scan_tools/Scan.py @@ -122,11 +122,16 @@ def print_result(self) -> None: for port_info in self.scan_result if port_info["state"] != "closed" ] + if self.__class__.__name__ == "UdpScan": + trans_port_protocol = "udp" + else: + trans_port_protocol = "tcp" # port6桁+/tcpで10桁 - print(f"{"PORT":<10} {"STATE":<8} SERVICE") + # print(f"{"PORT":<10} {"STATE":<8} SERVICE") FIXME: 文字列の最大値を使うように変更 + print(f"{"PORT":<10} {"STATE":<13} SERVICE") for port_info in self.scan_result: print( - f"{port_info["port"]}/tcp".ljust(10) + f"{port_info["port"]}/{trans_port_protocol}".ljust(10) + " " - + f"{port_info["state"]:<8} unknown" + + f"{port_info["state"]:<13} unknown" ) diff --git a/src/my_portscanner/scan_tools/SynScan.py b/src/my_portscanner/scan_tools/SynScan.py index d7eabd5..ab6a6ca 100644 --- a/src/my_portscanner/scan_tools/SynScan.py +++ b/src/my_portscanner/scan_tools/SynScan.py @@ -6,6 +6,7 @@ class SynScan(Scan): + def run(self) -> list[dict]: """ Run SYN scan diff --git a/src/my_portscanner/scan_tools/UdpScan.py b/src/my_portscanner/scan_tools/UdpScan.py new file mode 100644 index 0000000..41783e6 --- /dev/null +++ b/src/my_portscanner/scan_tools/UdpScan.py @@ -0,0 +1,45 @@ +# coding: utf-8 +from scapy.all import UDP, IP, sr1, ICMP, conf +import asyncio +from .Scan import Scan + + +class UdpScan(Scan): + def run(self) -> list[dict]: + """ + Run UDP scan + Returns: + scan_result: list[dict] + e.g: [{"port": port, "state": "open"}, {"port": port, "state": "closed"}, ...] + """ + if not self.no_ping: + self._get_latency() + self.scan_result = asyncio.run(self._async_run()) + # NOTE: 非同期処理により複数回PermissoinErrorが上がらないようにするため,例外の伝播を行っている。 + return self.scan_result + + def _port_scan(self, port: int) -> dict: + """ + Run UDP scan for a single port + 非同期処理で実行される関数 + Args: + port int: port_number + Returns: + dict: {"port": port, "state": state} + """ + conf.verb = 0 # packet送信時のログをSTDOUTに表示しない + udp_packet = IP(dst=self.target_ip) / UDP(dport=port) + + response = sr1(udp_packet, timeout=self.max_rtt_timeout / 1000) + + # timeoutしてresponseなしの場合はportが開いているのかFWによってICMPメッセージがフィルタリングされているかの判断ができないため,open|filteredとする。 + if response is None: + return {"port": port, "state": "open|filtered"} + elif ( + (response.haslayer(ICMP)) + and (response.getlayer(ICMP).type == 3) + and (response.getlayer(ICMP).code == 3) + ): + return {"port": port, "state": "closed"} + else: + return {"port": port, "state": "unknown"} diff --git a/src/my_portscanner/scan_tools/__init__.py b/src/my_portscanner/scan_tools/__init__.py index 3c63998..f6ada36 100644 --- a/src/my_portscanner/scan_tools/__init__.py +++ b/src/my_portscanner/scan_tools/__init__.py @@ -1,2 +1,3 @@ from .ConnectScan import ConnectScan from .SynScan import SynScan +from .UdpScan import UdpScan diff --git a/src/tests/scan_tools/test_ConnectScan.py b/src/tests/scan_tools/test_ConnectScan.py index b0f43a2..eac8ab1 100644 --- a/src/tests/scan_tools/test_ConnectScan.py +++ b/src/tests/scan_tools/test_ConnectScan.py @@ -83,11 +83,11 @@ def test_print_result(self): output = captured_output.getvalue().strip() expected_output = ( - f"{'PORT':<10} {'STATE':<8} SERVICE\n" - f"{'22/tcp':<10}" + " " + f"{'filtered':<8} unknown\n" - f"{'80/tcp':<10}" + " " + f"{'open':<8} unknown\n" - f"{'443/tcp':<10}" + " " + f"{'open':<8} unknown\n" - f"{'8080/tcp':<10}" + " " + f"{'closed':<8} unknown" + f"{'PORT':<10} {'STATE':<13} SERVICE\n" + f"{'22/tcp':<10}" + " " + f"{'filtered':<13} unknown\n" + f"{'80/tcp':<10}" + " " + f"{'open':<13} unknown\n" + f"{'443/tcp':<10}" + " " + f"{'open':<13} unknown\n" + f"{'8080/tcp':<10}" + " " + f"{'closed':<13} unknown" ) self.assertEqual(output, expected_output) @@ -118,7 +118,7 @@ def test_print_result_remove_closed_ports_when_scan_result_is_long(self): output = captured_output.getvalue().strip() # 何も表示されないことを確認 - expected_output = f"{'PORT':<10} {'STATE':<8} SERVICE" + expected_output = f"{'PORT':<10} {'STATE':<13} SERVICE" self.assertEqual(output, expected_output) diff --git a/src/tests/scan_tools/test_SynScan.py b/src/tests/scan_tools/test_SynScan.py index a78bb2b..7977dad 100644 --- a/src/tests/scan_tools/test_SynScan.py +++ b/src/tests/scan_tools/test_SynScan.py @@ -8,6 +8,10 @@ class TestSynScan(unittest.TestCase): + """ + NOTE: print_result()関連のtestはConnectScan.pyと同じため省略 + """ + def setUp(self): """ テスト実行時に毎回実行され, diff --git a/src/tests/scan_tools/test_UdpScan.py b/src/tests/scan_tools/test_UdpScan.py new file mode 100644 index 0000000..a0396ea --- /dev/null +++ b/src/tests/scan_tools/test_UdpScan.py @@ -0,0 +1,105 @@ +from io import StringIO +import sys +import unittest +from unittest.mock import patch, MagicMock +from scapy.all import UDP, ICMP +from my_portscanner.scan_tools.UdpScan import UdpScan + + +class TestUdpScan(unittest.TestCase): + """ + NOTE: print_result()の一部テストはtest_ConnectScan.pyと共通なので省略 + """ + + def setUp(self): + """ + テスト実行時に毎回実行され, + socket.socketのmockしたインスタンスを作成する。 + """ + # 共通のテストデータ + self.target_ip = "192.168.150.2" + self.target_port_list = [68, 123, 135] + self.expected_open_filterd_ports = [68, 123] # dhcp, ntp + self.expected_closed_ports = [135] # msrpc + self.max_rtt_timeout = 100 + self.max_parallelism = 16 + + def sr1_side_effect(packet, timeout): + if packet[UDP].dport in self.expected_open_filterd_ports: + return None + elif packet[UDP].dport in self.expected_closed_ports: + mock_response = MagicMock() + # ICMP layerがあるかどうか + mock_response.haslayer.side_effect = lambda x: x == ICMP + # mock_response[ICMP].type = 3 + # mock_response[ICMP].code = 3 + mock_icmp = MagicMock() + mock_icmp.type = 3 + mock_icmp.code = 3 + mock_response.getlayer.return_value = mock_icmp + return mock_response + + self.sr1_side_effect = sr1_side_effect + + @patch("my_portscanner.scan_tools.UdpScan.sr1") + def test_port_scan(self, mock_sr1): + mock_sr1.side_effect = self.sr1_side_effect + + scan = UdpScan( + target_ip=self.target_ip, + target_port_list=self.target_port_list, + max_rtt_timeout=self.max_rtt_timeout, + max_parallelism=self.max_parallelism, + no_ping=False, + ) + scan_result = [] + for port in self.target_port_list: + scan_result.append(scan._port_scan(port)) + self.assertEqual( + scan_result, + [ + {"port": 68, "state": "open|filtered"}, + {"port": 123, "state": "open|filtered"}, + {"port": 135, "state": "closed"}, + ], + ) + + def test_print_result(self): + """_summary_ + print_resultで想定通りの出力がでるか確認するテスト + """ + scan = UdpScan( + target_ip=self.target_ip, + target_port_list=self.target_port_list, + max_rtt_timeout=self.max_rtt_timeout, + max_parallelism=self.max_parallelism, + no_ping=True, + ) + scan.scan_result = [ + {"port": 68, "state": "open|filtered"}, + {"port": 123, "state": "open|filtered"}, + {"port": 135, "state": "closed"}, + ] + + # 標準出力をキャプチャ + captured_output = StringIO() + sys.stdout = captured_output + + scan.print_result() + + # 標準出力の内容を取得 + sys.stdout = sys.__stdout__ + output = captured_output.getvalue().strip() + + expected_output = ( + f"{'PORT':<10} {'STATE':<13} SERVICE\n" + f"{'68/udp':<10}" + " " + f"{'open|filtered':<13} unknown\n" + f"{'123/udp':<10}" + " " + f"{'open|filtered':<13} unknown\n" + f"{'135/udp':<10}" + " " + f"{'closed':<13} unknown" + ) + + self.assertEqual(output, expected_output) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/tests/test_options.py b/src/tests/test_options.py index 0c479f1..32818c1 100644 --- a/src/tests/test_options.py +++ b/src/tests/test_options.py @@ -62,7 +62,7 @@ def test_parse_args_port_range(): assert args["port"] == [22, 23, 24, 25, 26, 27, 28, 29, 30] -def test_create_port_list_non_digit_list(): +def test_create_port_list_non_digit_lest(): """ -p hoge,32のようなポートレンジ指定機能のテスト """ @@ -100,11 +100,43 @@ def test_create_port_list_start_larger_than_end(): def test_create_port_list_all_port(): """ - -p 32-22のようなポートレンジ指定機能のテスト + -p- オールポートレンジ指定機能のテスト + tcpは0~65535, udpは1~1023 """ port = "-" + assert list(range(0, 65536)) == options._create_port_list(port) + assert list(range(1, 1024)) == options._create_port_list(port, is_udp=True) + + +def test_create_port_list_single_port(): + """ + -p 22のようなポートレンジ指定機能のテスト + """ + port = "22" port_list = options._create_port_list(port) - assert port_list == list(range(0, 65536)) + assert port_list == [22] + + +def test_select_scan_type_connect(): + """ + _select_scan_type()のテスト + """ + # default + assert "connect" == options._select_scan_type(False, False, False) + + assert "connect" == options._select_scan_type(True, False, False) + assert "stealth" == options._select_scan_type(False, True, False) + assert "udp" == options._select_scan_type(False, False, True) + + +def test_select_scan_type_error(): + """ + _select_scan_type()で複数のスキャン方法が指定されている場合にValueErrorが発生することを確認 + """ + try: + options._select_scan_type(True, True, False) + except ValueError as e: + assert str(e) == "[-sT] [-sS] [-sU] options are exclusive." if __name__ == "__main__":