-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsignature.py
216 lines (192 loc) · 6.52 KB
/
signature.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# -*- coding: utf-8 -*-
#! /usr/bin/env python3
from scapy.all import Ether, IP, ICMP
from copy import deepcopy
from typing import Tuple, Union
def switch_directions(signatur: 'Signature') -> Tuple['Signature', 'Signature']:
"""
Switches the source and destination directions of a given signature.
Parameters
----------
signatur : Signature
The signature to switch directions for.
Returns
-------
Tuple[Signature, Signature]
A tuple containing the original signature with its direction set to '->'
and the switched signature with its source and destination swapped.
"""
srcdst = deepcopy(signatur)
srcdst.dir = '->'
dstsrc = deepcopy(signatur)
dstsrc.dir = '->'
dstsrc.src_ip = dstsrc.dst_ip
dstsrc.src_port = dstsrc.dst_port
dstsrc.dst_ip = srcdst.src_ip
dstsrc.dst_port = srcdst.src_port
return srcdst, dstsrc
def not_eq(other_: str, self_: str, normal: bool = True) -> bool:
"""
Checks inequality between two values with special handling for 'any', '!', and ranges.
Parameters
----------
other_ : str
The value to compare against.
self_ : str
The value to be compared.
normal : bool, optional
Whether to perform a normal comparison (default is True).
Returns
-------
bool
True if the values are not equal based on the rules, False otherwise.
"""
if normal:
if other_ == 'IP' and self_ in ['TCP', 'UDP']:
return False
else:
return self_ == other_[1:] if other_[0] == '!' else self_ != other_
else:
if self_ == 'any':
return False
split = other_.split('!')
if '-' in other_:
split_split = split[-1].split('-')
min_ = split_split[0][1:]
max_ = split_split[1][:-1]
else:
min_ = split[-1]
max_ = split[-1]
other_ = range(int(min_), int(max_)+1)
try:
self_ = int(self_)
except ValueError:
print(f"no meaning full compare/TODO: {self_}")
return True
else:
return (len(split) == 1 and self_ not in other_) or\
(len(split) == 2 and self_ in other_)
class Signature(object):
"""
A class used to represent a network packet signature.
This class can be initialized with either a scapy packet or a string, and it
extracts or parses the relevant information to create a signature for comparison.
Attributes
----------
s_id : str
The ID of the signature.
proto : str
The protocol of the packet.
src_ip : str
The source IP address.
src_port : str
The source port.
dir : str
The direction of the packet ('->', '<>', etc.).
dst_ip : str
The destination IP address.
dst_port : str
The destination port.
payload : str
The payload of the packet.
Methods
-------
__str__():
Returns the string representation of the signature.
__repr__():
Returns the representation of the rule ID.
__eq__(other):
Checks if the signature is equal to another signature.
"""
def __init__(self, obj):
super(Signature, self).__init__()
if isinstance(obj, Ether):
direction = '->'
s_id = '-1'
if IP in obj:
proto = obj[2].name
src_ip = str(obj[1].src)
dst_ip = str(obj[1].dst)
payload = '*'
try:
src_port = str(obj[1].sport)
dst_port = str(obj[1].dport)
except AttributeError:
if ICMP in obj:
src_port = 'any'
dst_port = 'any'
else:
raise ValueError()
except IndexError:
raise ValueError()
else:
raise ValueError()
elif isinstance(obj, str):
string = obj.split(' ')
if len(string) == 5:
src_split = string[1].split(':')
dst_split = string[3].split(':')
s_id = ''
proto = string[0]
src_ip = src_split[0]
src_port = src_split[1]
direction = string[2]
dst_ip = dst_split[0]
dst_port = dst_split[1]
payload = string[4]
elif len(string) == 6:
src_split = string[2].split(':')
dst_split = string[4].split(':')
s_id = string[0].split(':')[0]
proto = string[1]
src_ip = src_split[0]
src_port = src_split[1]
direction = string[3]
dst_ip = dst_split[0]
dst_port = dst_split[1]
payload = string[5]
else:
raise ValueError(obj, 'cant be initialized')
del obj
self.s_id = s_id
self.proto = proto
self.src_ip = src_ip
self.src_port = src_port
self.dir = direction
self.dst_ip = dst_ip
self.dst_port = dst_port
self.payload = payload
def __str__(self):
return f"{self.proto} {self.src_ip}:{self.src_port} {self.dir} {self.dst_ip}:{self.dst_port} {self.payload}"
def __repr__(self):
return f"ruleID {self.s_id}"
def __eq__(self, other):
"""
not commutative
self always without !/any/<>/portRange
"""
if isinstance(self, other.__class__):
if other.dir == '<>':
dir_a, dir_b = switch_directions(other)
return self.__eq__(dir_a) or self.__eq__(dir_b)
if other.proto != 'any':
if not_eq(other.proto, self.proto):
return False
if other.src_ip != 'any':
if not_eq(other.src_ip, self.src_ip):
return False
if other.src_port != 'any':
if not_eq(other.src_port, self.src_port, 0):
return False
if other.dst_ip != 'any':
if not_eq(other.dst_ip, self.dst_ip):
return False
if other.dst_port != 'any':
if not_eq(other.dst_port, self.dst_port, 0):
return False
if other.payload != 'any':
if self.payload != other.payload:
return False
return True
else:
return False