-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathanonymizepcap.py
executable file
·214 lines (169 loc) · 7.21 KB
/
anonymizepcap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
# Author: Matthaeus Wander <[email protected]>
# Dependencies:
import dpkt
import argparse
import hashlib
import ipaddress
import hmac
import os
import signal
import socket
import struct
import sys
IGNORE_ETHERTYPES = [
dpkt.ethernet.ETH_TYPE_ARP,
]
class NetworkList(object):
def __init__(self, netfile):
self.networks = []
self.cache = {}
with open(netfile, 'r') as fh:
for line in fh:
line = line.strip()
if line:
self.networks.append(ipaddress.ip_network(line))
def __iter__(self):
return self.networks.__iter__()
def __contains__(self, ip):
if ip in self.cache:
return self.cache[ip]
# Build cache
ipa = ipaddress.ip_address(ip)
for net in self.networks:
if ipa in net:
self.cache[ip] = True
return True
self.cache[ip] = False
return False
def __len__(self):
return self.networks.__len__()
def replace_address(ipmap, ip, offset, secret):
if ip not in ipmap:
assert len(ip) in (4, 16), 'ip address length is neither 4 nor 16'
is_ip4 = len(ip) == 4
# keyed-hash
if secret:
hmac_obj = hmac.new(secret, ip, hashlib.sha256)
mapped = hmac_obj.digest()
# first time seen
else:
newnum = len(ipmap) + 1
packvalue = '>I' if is_ip4 else '>Q'
mapped = struct.pack(packvalue, newnum)
if is_ip4:
# 1 offset byte + 3 bytes mapping
ipmap[ip] = offset + mapped[-3:]
else:
# 1 offset byte + 7 null bytes + 8 bytes mapping
ipmap[ip] = b'\x00' + offset + b'\x00'*6 + mapped[-8:]
return ipmap[ip]
def has_transport_sum(ip):
if not isinstance(ip.data, dpkt.udp.UDP) and not isinstance(ip.data, dpkt.tcp.TCP):
return False
if isinstance(ip, dpkt.ip.IP):
return ip.offset == 0
elif isinstance(ip, dpkt.ip6.IP6):
if dpkt.ip.IP_PROTO_FRAGMENT not in ip.extension_hdrs:
return False
ext = ip.extension_hdrs[dpkt.ip.IP_PROTO_FRAGMENT]
return ext.frag_off == 0
def main(inhandles, outfile, anonnets, offset, secret):
pcap_writer = None
ipmap = {}
for fh in inhandles:
pcap_reader = dpkt.pcap.Reader(fh)
linktype = pcap_reader.datalink()
if pcap_writer is None:
fout = open(outfile, 'wb')
pcap_writer = dpkt.pcap.Writer(fout, linktype=linktype)
for ts, data in pcap_reader:
if linktype == dpkt.pcap.DLT_EN10MB: # Ethernet
frame = dpkt.ethernet.Ethernet(data)
# clear MAC addresses
frame.src = b'\x00' * len(frame.src)
frame.dst = b'\x00' * len(frame.dst)
packettype = frame.type
elif linktype == dpkt.pcap.DLT_LINUX_SLL: # Linux cooked capture (SLL)
frame = dpkt.sll.SLL(data)
# clear MAC address
frame.hdr = b'\x00' * len(frame.hdr)
packettype = frame.ethtype
elif linktype == dpkt.pcap.DLT_LINUX_SLL2: # Linux cooked capture v2 (SLL2)
frame = dpkt.sll2.SLL2(data)
# clear MAC address
frame.hdr = b'\x00' * len(frame.hdr)
packettype = frame.ethtype
else:
assert False, 'Linktype {} is not implemented yet'.format(linktype)
if packettype not in (dpkt.ethernet.ETH_TYPE_IP, dpkt.ethernet.ETH_TYPE_IP6):
if packettype not in IGNORE_ETHERTYPES:
print('WARNING: skipping unknown EtherType', hex(packettype), file=sys.stderr)
print('Are you using this network protocol or did the parser fail to read ethernet frames?', file=sys.stderr)
pcap_writer.writepkt(frame, ts)
continue
# Anonymize IPv4/IPv6 address
ip = frame.data
reset_cksum = False
if anonnets is None or ip.src in anonnets:
ip.src = replace_address(ipmap, ip.src, offset, secret)
reset_cksum = True
if anonnets is None or ip.dst in anonnets:
ip.dst = replace_address(ipmap, ip.dst, offset, secret)
reset_cksum = True
# Reset IP checksums. dpkt serializer will compute new values
if reset_cksum:
ip.sum = 0
# Reset TCP/UDP checksums, if applicable
if has_transport_sum(ip):
ip.data.sum = 0
pcap_writer.writepkt(frame, ts)
fh.close()
if pcap_writer is None:
print('No input data', file=sys.stderr)
else:
pcap_writer.close()
print('Finished. Used', len(ipmap), 'IP address mappings.', file=sys.stderr)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Anonymize pcap file')
parser.add_argument('-n', '--networksfile', help='File with line-by-line list of networks to be anonymized (if unset: anonymize all)')
parser.add_argument('-o', '--offset', help='One byte offset for output IP address range (default: 238)', default=238, type=int)
parser.add_argument('-s', '--secret', help='Secret for keyed-hash mapping (default: none)')
parser.add_argument('-i', '--stdin', action='store_true', help='Read pcap input from stdin')
parser.add_argument('infiles', nargs='*', help='Input pcap filename(s)')
parser.add_argument('outfile', help='Output pcap filename')
args = parser.parse_args()
if args.infiles:
print('Reading PCAP from', len(args.infiles), 'input files.', file=sys.stderr)
else:
print('No input files given.', file=sys.stderr)
inhandles = [ open(infile, 'rb') for infile in args.infiles ]
if args.stdin:
if os.isatty(sys.stdin.buffer.fileno()):
print('ERROR: Cannot read from interactive TTY. Use piped input instead.', file=sys.stderr)
sys.exit(1)
print('Reading PCAP from pipe over stdin.', file=sys.stderr)
inhandles.append(sys.stdin.buffer)
# If reading from stdin, ignore ctrl+c (will terminate when EOF has been reached)
signal.signal(signal.SIGINT, signal.SIG_IGN)
if args.outfile:
if os.path.exists(args.outfile):
print('ERROR: Output file exists already:', args.outfile, file=sys.stderr)
sys.exit(1)
if args.networksfile:
anonnets = NetworkList(args.networksfile)
print('Loaded', len(anonnets), 'networks to be anonymized.', file=sys.stderr)
else:
anonnets = None
print('No networks file given, will anonymize all IP addresses.', file=sys.stderr)
if not 0 < args.offset < 255:
print('Offset must be 0-255, not', args.offset, file=sys.stderr)
sys.exit(1)
offset = struct.pack('B', args.offset)
if args.secret:
print('Got secret, using keyed-hash mapping.', file=sys.stderr)
secret = args.secret.encode('utf-8')
else:
print('No secret given, using first time seen mapping.', file=sys.stderr)
secret = None
main(inhandles, args.outfile, anonnets, offset, secret)