generated from RyosukeDTomita/template_repository_all
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #57 from RyosukeDTomita/feature/udpscan
add udpscan
- Loading branch information
Showing
10 changed files
with
260 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
|
||
|
||
class SynScan(Scan): | ||
|
||
def run(self) -> list[dict]: | ||
""" | ||
Run SYN scan | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# coding: utf-8 | ||
from scapy.all import UDP, IP, sr1, ICMP, conf | ||
import asyncio | ||
from .Scan import Scan | ||
|
||
|
||
class UdpScan(Scan): | ||
def run(self) -> list[dict]: | ||
""" | ||
Run UDP scan | ||
Returns: | ||
scan_result: list[dict] | ||
e.g: [{"port": port, "state": "open"}, {"port": port, "state": "closed"}, ...] | ||
""" | ||
if not self.no_ping: | ||
self._get_latency() | ||
self.scan_result = asyncio.run(self._async_run()) | ||
# NOTE: 非同期処理により複数回PermissoinErrorが上がらないようにするため,例外の伝播を行っている。 | ||
return self.scan_result | ||
|
||
def _port_scan(self, port: int) -> dict: | ||
""" | ||
Run UDP scan for a single port | ||
非同期処理で実行される関数 | ||
Args: | ||
port int: port_number | ||
Returns: | ||
dict: {"port": port, "state": state} | ||
""" | ||
conf.verb = 0 # packet送信時のログをSTDOUTに表示しない | ||
udp_packet = IP(dst=self.target_ip) / UDP(dport=port) | ||
|
||
response = sr1(udp_packet, timeout=self.max_rtt_timeout / 1000) | ||
|
||
# timeoutしてresponseなしの場合はportが開いているのかFWによってICMPメッセージがフィルタリングされているかの判断ができないため,open|filteredとする。 | ||
if response is None: | ||
return {"port": port, "state": "open|filtered"} | ||
elif ( | ||
(response.haslayer(ICMP)) | ||
and (response.getlayer(ICMP).type == 3) | ||
and (response.getlayer(ICMP).code == 3) | ||
): | ||
return {"port": port, "state": "closed"} | ||
else: | ||
return {"port": port, "state": "unknown"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
from .ConnectScan import ConnectScan | ||
from .SynScan import SynScan | ||
from .UdpScan import UdpScan |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from io import StringIO | ||
import sys | ||
import unittest | ||
from unittest.mock import patch, MagicMock | ||
from scapy.all import UDP, ICMP | ||
from my_portscanner.scan_tools.UdpScan import UdpScan | ||
|
||
|
||
class TestUdpScan(unittest.TestCase): | ||
""" | ||
NOTE: print_result()の一部テストはtest_ConnectScan.pyと共通なので省略 | ||
""" | ||
|
||
def setUp(self): | ||
""" | ||
テスト実行時に毎回実行され, | ||
socket.socketのmockしたインスタンスを作成する。 | ||
""" | ||
# 共通のテストデータ | ||
self.target_ip = "192.168.150.2" | ||
self.target_port_list = [68, 123, 135] | ||
self.expected_open_filterd_ports = [68, 123] # dhcp, ntp | ||
self.expected_closed_ports = [135] # msrpc | ||
self.max_rtt_timeout = 100 | ||
self.max_parallelism = 16 | ||
|
||
def sr1_side_effect(packet, timeout): | ||
if packet[UDP].dport in self.expected_open_filterd_ports: | ||
return None | ||
elif packet[UDP].dport in self.expected_closed_ports: | ||
mock_response = MagicMock() | ||
# ICMP layerがあるかどうか | ||
mock_response.haslayer.side_effect = lambda x: x == ICMP | ||
# mock_response[ICMP].type = 3 | ||
# mock_response[ICMP].code = 3 | ||
mock_icmp = MagicMock() | ||
mock_icmp.type = 3 | ||
mock_icmp.code = 3 | ||
mock_response.getlayer.return_value = mock_icmp | ||
return mock_response | ||
|
||
self.sr1_side_effect = sr1_side_effect | ||
|
||
@patch("my_portscanner.scan_tools.UdpScan.sr1") | ||
def test_port_scan(self, mock_sr1): | ||
mock_sr1.side_effect = self.sr1_side_effect | ||
|
||
scan = UdpScan( | ||
target_ip=self.target_ip, | ||
target_port_list=self.target_port_list, | ||
max_rtt_timeout=self.max_rtt_timeout, | ||
max_parallelism=self.max_parallelism, | ||
no_ping=False, | ||
) | ||
scan_result = [] | ||
for port in self.target_port_list: | ||
scan_result.append(scan._port_scan(port)) | ||
self.assertEqual( | ||
scan_result, | ||
[ | ||
{"port": 68, "state": "open|filtered"}, | ||
{"port": 123, "state": "open|filtered"}, | ||
{"port": 135, "state": "closed"}, | ||
], | ||
) | ||
|
||
def test_print_result(self): | ||
"""_summary_ | ||
print_resultで想定通りの出力がでるか確認するテスト | ||
""" | ||
scan = UdpScan( | ||
target_ip=self.target_ip, | ||
target_port_list=self.target_port_list, | ||
max_rtt_timeout=self.max_rtt_timeout, | ||
max_parallelism=self.max_parallelism, | ||
no_ping=True, | ||
) | ||
scan.scan_result = [ | ||
{"port": 68, "state": "open|filtered"}, | ||
{"port": 123, "state": "open|filtered"}, | ||
{"port": 135, "state": "closed"}, | ||
] | ||
|
||
# 標準出力をキャプチャ | ||
captured_output = StringIO() | ||
sys.stdout = captured_output | ||
|
||
scan.print_result() | ||
|
||
# 標準出力の内容を取得 | ||
sys.stdout = sys.__stdout__ | ||
output = captured_output.getvalue().strip() | ||
|
||
expected_output = ( | ||
f"{'PORT':<10} {'STATE':<13} SERVICE\n" | ||
f"{'68/udp':<10}" + " " + f"{'open|filtered':<13} unknown\n" | ||
f"{'123/udp':<10}" + " " + f"{'open|filtered':<13} unknown\n" | ||
f"{'135/udp':<10}" + " " + f"{'closed':<13} unknown" | ||
) | ||
|
||
self.assertEqual(output, expected_output) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters