-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinet6.py
More file actions
4030 lines (3260 loc) · 150 KB
/
inet6.py
File metadata and controls
4030 lines (3260 loc) · 150 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python
#############################################################################
# #
# inet6.py --- IPv6 support for Scapy #
# see http://natisbad.org/IPv6/ #
# for more information #
# #
# Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> #
# Arnaud Ebalard <arnaud.ebalard@eads.net> #
# #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License version 2 as #
# published by the Free Software Foundation. #
# #
# This program is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
# General Public License for more details. #
# #
#############################################################################
"""
IPv6 (Internet Protocol v6).
"""
from __future__ import absolute_import
from __future__ import print_function
from hashlib import md5
import random
import socket
import struct
from time import gmtime, strftime
from scapy.all import *
# from scapy.arch import get_if_hwaddr
# from scapy.as_resolvers import AS_resolver_riswhois
# from scapy.base_classes import Gen
# from scapy.compat import chb, orb, raw, plain_str
# from scapy.config import conf
# import scapy.consts
# from scapy.data import DLT_IPV6, DLT_RAW, DLT_RAW_ALT, ETHER_ANY, ETH_P_IPV6, \
# MTU
# from scapy.error import warning
# from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \
# DestIP6Field, FieldLenField, FlagsField, IntField, IP6Field, \
# LongField, MACField, PacketLenField, PacketListField, ShortEnumField, \
# ShortField, SourceIP6Field, StrField, StrFixedLenField, StrLenField, \
# X3BytesField, XBitField, XIntField, XShortField
# from scapy.layers.inet import IP, IPTools, TCP, TCPerror, TracerouteResult, \
# UDP, UDPerror
# from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP
# import scapy.modules.six as six
# from scapy.packet import bind_layers, Packet, Raw
# from scapy.sendrecv import sendp, sniff, sr, srp1
# from scapy.supersocket import SuperSocket, L3RawSocket
# from scapy.utils import checksum, strxor
# from scapy.pton_ntop import inet_pton, inet_ntop
# from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
# in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
# in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
# from scapy.volatile import RandInt, RandShort
if not socket.has_ipv6:
raise socket.error("can't use AF_INET6, IPv6 is disabled")
if not hasattr(socket, "IPPROTO_IPV6"):
# Workaround for http://bugs.python.org/issue6926
socket.IPPROTO_IPV6 = 41
if not hasattr(socket, "IPPROTO_IPIP"):
# Workaround for https://bitbucket.org/secdev/scapy/issue/5119
socket.IPPROTO_IPIP = 4
if conf.route6 is None:
# unused import, only to initialize conf.route6
import scapy.route6
##########################
# Neighbor cache stuff #
##########################
conf.netcache.new_cache("in6_neighbor", 120)
@conf.commands.register
def neighsol(addr, src, iface, timeout=1, chainCC=0):
"""Sends and receive an ICMPv6 Neighbor Solicitation message
This function sends an ICMPv6 Neighbor Solicitation message
to get the MAC address of the neighbor with specified IPv6 address address.
'src' address is used as source of the message. Message is sent on iface.
By default, timeout waiting for an answer is 1 second.
If no answer is gathered, None is returned. Else, the answer is
returned (ethernet frame).
"""
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
d = inet_ntop(socket.AF_INET6, nsma)
dm = in6_getnsmac(nsma)
p = Ether(dst=dm) / IPv6(dst=d, src=src, hlim=255)
p /= ICMPv6ND_NS(tgt=addr)
p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
chainCC=chainCC)
return res
@conf.commands.register
def getmacbyip6(ip6, chainCC=0):
"""Returns the MAC address corresponding to an IPv6 address
neighborCache.get() method is used on instantiated neighbor cache.
Resolution mechanism is described in associated doc string.
(chainCC parameter value ends up being passed to sending function
used to perform the resolution, if needed)
"""
if isinstance(ip6, Net6):
ip6 = str(ip6)
if in6_ismaddr(ip6): # Multicast
mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
return mac
iff, a, nh = conf.route6.route(ip6)
if iff == scapy.consts.LOOPBACK_INTERFACE:
return "ff:ff:ff:ff:ff:ff"
if nh != '::':
ip6 = nh # Found next hop
mac = conf.netcache.in6_neighbor.get(ip6)
if mac:
return mac
res = neighsol(ip6, a, iff, chainCC=chainCC)
if res is not None:
if ICMPv6NDOptDstLLAddr in res:
mac = res[ICMPv6NDOptDstLLAddr].lladdr
else:
mac = res.src
conf.netcache.in6_neighbor[ip6] = mac
return mac
return None
#############################################################################
#############################################################################
# IPv6 Class #
#############################################################################
#############################################################################
ipv6nh = {0: "Hop-by-Hop Option Header",
4: "IP",
6: "TCP",
17: "UDP",
41: "IPv6",
43: "Routing Header",
44: "Fragment Header",
47: "GRE",
50: "ESP Header",
51: "AH Header",
58: "ICMPv6",
59: "No Next Header",
60: "Destination Option Header",
112: "VRRP",
132: "SCTP",
135: "Mobility Header"}
ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
4: "IP",
6: "TCP",
17: "UDP",
43: "IPv6ExtHdrRouting",
44: "IPv6ExtHdrFragment",
# 50: "IPv6ExtHrESP",
# 51: "IPv6ExtHdrAH",
58: "ICMPv6Unknown",
59: "Raw",
60: "IPv6ExtHdrDestOpt"}
class IP6ListField(StrField):
__slots__ = ["count_from", "length_from"]
islist = 1
def __init__(self, name, default, count_from=None, length_from=None):
if default is None:
default = []
StrField.__init__(self, name, default)
self.count_from = count_from
self.length_from = length_from
def i2len(self, pkt, i):
return 16 * len(i)
def i2count(self, pkt, i):
if isinstance(i, list):
return len(i)
return 0
def getfield(self, pkt, s):
c = tmp_len = None
if self.length_from is not None:
tmp_len = self.length_from(pkt)
elif self.count_from is not None:
c = self.count_from(pkt)
lst = []
ret = b""
remain = s
if tmp_len is not None:
remain, ret = s[:tmp_len], s[tmp_len:]
while remain:
if c is not None:
if c <= 0:
break
c -= 1
addr = inet_ntop(socket.AF_INET6, remain[:16])
lst.append(addr)
remain = remain[16:]
return remain + ret, lst
def i2m(self, pkt, x):
s = b""
for y in x:
try:
y = inet_pton(socket.AF_INET6, y)
except Exception:
y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
y = inet_pton(socket.AF_INET6, y)
s += y
return s
def i2repr(self, pkt, x):
s = []
if x is None:
return "[]"
for y in x:
s.append('%s' % y)
return "[ %s ]" % (", ".join(s))
class _IPv6GuessPayload:
name = "Dummy class that implements guess_payload_class() for IPv6"
def default_payload_class(self, p):
if self.nh == 58: # ICMPv6
t = orb(p[0])
if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
return _niquery_guesser(p)
if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501
if t == 130 and len(p) >= 28:
# RFC 3810 - 8.1. Query Version Distinctions
return ICMPv6MLQuery2
return icmp6typescls.get(t, Raw)
return Raw
elif self.nh == 135 and len(p) > 3: # Mobile IPv6
return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
return IPv6ExtHdrSegmentRouting
return ipv6nhcls.get(self.nh, Raw)
class IPv6(_IPv6GuessPayload, Packet, IPTools):
name = "IPv6"
fields_desc = [BitField("version", 6, 4),
BitField("tc", 0, 8), # TODO: IPv6, ByteField ?
BitField("fl", 0, 20),
ShortField("plen", None),
ByteEnumField("nh", 59, ipv6nh),
ByteField("hlim", 64),
SourceIP6Field("src", "dst"), # dst is for src @ selection
DestIP6Field("dst", "::1")]
def route(self):
"""Used to select the L2 address"""
dst = self.dst
if isinstance(dst, Gen):
dst = next(iter(dst))
return conf.route6.route(dst)
def mysummary(self):
return "%s > %s (%i)" % (self.src, self.dst, self.nh)
def post_build(self, p, pay):
p += pay
if self.plen is None:
tmp_len = len(p) - 40
p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
return p
def extract_padding(self, data):
"""Extract the IPv6 payload"""
if self.plen == 0 and self.nh == 0 and len(data) >= 8:
# Extract Hop-by-Hop extension length
hbh_len = orb(data[1])
hbh_len = 8 + hbh_len * 8
# Extract length from the Jumbogram option
# Note: the following algorithm take advantage of the Jumbo option
# mandatory alignment (4n + 2, RFC2675 Section 2)
jumbo_len = None
idx = 0
offset = 4 * idx + 2
while offset <= len(data):
opt_type = orb(data[offset])
if opt_type == 0xc2: # Jumbo option
jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501
break
offset = 4 * idx + 2
idx += 1
if jumbo_len is None:
warning("Scapy did not find a Jumbo option")
jumbo_len = 0
tmp_len = hbh_len + jumbo_len
else:
tmp_len = self.plen
return data[:tmp_len], data[tmp_len:]
def hashret(self):
if self.nh == 58 and isinstance(self.payload, _ICMPv6):
if self.payload.type < 128:
return self.payload.payload.hashret()
elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
return struct.pack("B", self.nh) + self.payload.hashret()
if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
return self.payload.hashret()
nh = self.nh
sd = self.dst
ss = self.src
if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
# With routing header, the destination is the last
# address of the IPv6 list if segleft > 0
nh = self.payload.nh
try:
sd = self.addresses[-1]
except IndexError:
sd = '::1'
# TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501
# could be anything from the original list ...
if 1:
sd = inet_pton(socket.AF_INET6, sd)
for a in self.addresses:
a = inet_pton(socket.AF_INET6, a)
sd = strxor(sd, a)
sd = inet_ntop(socket.AF_INET6, sd)
if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
# With segment routing header (rh == 4), the destination is
# the first address of the IPv6 addresses list
try:
sd = self.addresses[0]
except IndexError:
sd = self.dst
if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
nh = self.payload.nh
if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
nh = self.payload.nh
if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
foundhao = None
for o in self.payload.options:
if isinstance(o, HAO):
foundhao = o
if foundhao:
nh = self.payload.nh # XXX what if another extension follows ?
ss = foundhao.hoa
if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
sd = inet_pton(socket.AF_INET6, sd)
ss = inet_pton(socket.AF_INET6, self.src)
return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501
else:
return struct.pack("B", nh) + self.payload.hashret()
def answers(self, other):
if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
if self.nh in [4, 41]:
return self.payload.answers(other)
if isinstance(other, IPv6) and other.nh in [4, 41]:
return self.answers(other.payload)
if isinstance(other, IP) and other.proto in [4, 41]:
return self.answers(other.payload)
if not isinstance(other, IPv6): # self is reply, other is request
return False
if conf.checkIPaddr:
# ss = inet_pton(socket.AF_INET6, self.src)
sd = inet_pton(socket.AF_INET6, self.dst)
os = inet_pton(socket.AF_INET6, other.src)
od = inet_pton(socket.AF_INET6, other.dst)
# request was sent to a multicast address (other.dst)
# Check reply destination addr matches request source addr (i.e
# sd == os) except when reply is multicasted too
# XXX test mcast scope matching ?
if in6_ismaddr(other.dst):
if in6_ismaddr(self.dst):
if ((od == sd) or
(in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501
return self.payload.answers(other.payload)
return False
if (os == sd):
return self.payload.answers(other.payload)
return False
elif (sd != os): # or ss != od): <- removed for ICMP errors
return False
if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501
# ICMPv6 Error message -> generated by IPv6 packet
# Note : at the moment, we jump the ICMPv6 specific class
# to call answers() method of erroneous packet (over
# initial packet). There can be cases where an ICMPv6 error
# class could implement a specific answers method that perform
# a specific task. Currently, don't see any use ...
return self.payload.payload.answers(other)
elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
return self.payload.answers(other.payload)
elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
return self.payload.answers(other.payload.payload)
elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
return self.payload.payload.answers(other.payload.payload)
elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501
return self.payload.payload.answers(other.payload)
else:
if (self.nh != other.nh):
return False
return self.payload.answers(other.payload)
class _IPv46(IP):
"""
This class implements a dispatcher that is used to detect the IP version
while parsing Raw IP pcap files.
"""
@classmethod
def dispatch_hook(cls, _pkt=None, *_, **kargs):
if _pkt:
if orb(_pkt[0]) >> 4 == 6:
return IPv6
elif kargs.get("version") == 6:
return IPv6
return IP
def inet6_register_l3(l2, l3):
return getmacbyip6(l3.dst)
conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
class IPerror6(IPv6):
name = "IPv6 in ICMPv6"
def answers(self, other):
if not isinstance(other, IPv6):
return False
sd = inet_pton(socket.AF_INET6, self.dst)
ss = inet_pton(socket.AF_INET6, self.src)
od = inet_pton(socket.AF_INET6, other.dst)
os = inet_pton(socket.AF_INET6, other.src)
# Make sure that the ICMPv6 error is related to the packet scapy sent
if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
# find upper layer for self (possible citation)
selfup = self.payload
while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
selfup = selfup.payload
# find upper layer for other (initial packet). Also look for RH
otherup = other.payload
request_has_rh = False
while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
if isinstance(otherup, IPv6ExtHdrRouting):
request_has_rh = True
otherup = otherup.payload
if ((ss == os and sd == od) or # <- Basic case
(ss == os and request_has_rh)): # <- Request has a RH :
# don't check dst address
# Let's deal with possible MSS Clamping
if (isinstance(selfup, TCP) and
isinstance(otherup, TCP) and
selfup.options != otherup.options): # seems clamped
# Save fields modified by MSS clamping
old_otherup_opts = otherup.options
old_otherup_cksum = otherup.chksum
old_otherup_dataofs = otherup.dataofs
old_selfup_opts = selfup.options
old_selfup_cksum = selfup.chksum
old_selfup_dataofs = selfup.dataofs
# Nullify them
otherup.options = []
otherup.chksum = 0
otherup.dataofs = 0
selfup.options = []
selfup.chksum = 0
selfup.dataofs = 0
# Test it and save result
s1 = raw(selfup)
s2 = raw(otherup)
tmp_len = min(len(s1), len(s2))
res = s1[:tmp_len] == s2[:tmp_len]
# recall saved values
otherup.options = old_otherup_opts
otherup.chksum = old_otherup_cksum
otherup.dataofs = old_otherup_dataofs
selfup.options = old_selfup_opts
selfup.chksum = old_selfup_cksum
selfup.dataofs = old_selfup_dataofs
return res
s1 = raw(selfup)
s2 = raw(otherup)
tmp_len = min(len(s1), len(s2))
return s1[:tmp_len] == s2[:tmp_len]
return False
def mysummary(self):
return Packet.mysummary(self)
#############################################################################
#############################################################################
# Upper Layer Checksum computation #
#############################################################################
#############################################################################
class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
name = "Pseudo IPv6 Header"
fields_desc = [IP6Field("src", "::"),
IP6Field("dst", "::"),
ShortField("uplen", None),
BitField("zero", 0, 24),
ByteField("nh", 0)]
def in6_chksum(nh, u, p):
"""
As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
Performs IPv6 Upper Layer checksum computation. Provided parameters are:
- 'nh' : value of upper layer protocol
- 'u' : upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
provided with all under layers (IPv6 and all extension headers,
for example)
- 'p' : the payload of the upper layer provided as a string
Functions operate by filling a pseudo header class instance (PseudoIPv6)
with
- Next Header value
- the address of _final_ destination (if some Routing Header with non
segleft field is present in underlayer classes, last address is used.)
- the address of _real_ source (basically the source address of an
IPv6 class instance available in the underlayer or the source address
in HAO option if some Destination Option header found in underlayer
includes this option).
- the length is the length of provided payload string ('p')
"""
ph6 = PseudoIPv6()
ph6.nh = nh
rthdr = 0
hahdr = 0
final_dest_addr_found = 0
while u is not None and not isinstance(u, IPv6):
if (isinstance(u, IPv6ExtHdrRouting) and
u.segleft != 0 and len(u.addresses) != 0 and
final_dest_addr_found == 0):
rthdr = u.addresses[-1]
final_dest_addr_found = 1
elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
u.segleft != 0 and len(u.addresses) != 0 and
final_dest_addr_found == 0):
rthdr = u.addresses[0]
final_dest_addr_found = 1
elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
isinstance(u.options[0], HAO)):
hahdr = u.options[0].hoa
u = u.underlayer
if u is None:
warning("No IPv6 underlayer to compute checksum. Leaving null.")
return 0
if hahdr:
ph6.src = hahdr
else:
ph6.src = u.src
if rthdr:
ph6.dst = rthdr
else:
ph6.dst = u.dst
ph6.uplen = len(p)
ph6s = raw(ph6)
return checksum(ph6s + p)
#############################################################################
#############################################################################
# Extension Headers #
#############################################################################
#############################################################################
# Inherited by all extension header classes
class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
name = 'Abstract IPv6 Option Header'
aliastypes = [IPv6, IPerror6] # TODO ...
# IPv6 options for Extension Headers #
_hbhopts = {0x00: "Pad1",
0x01: "PadN",
0x04: "Tunnel Encapsulation Limit",
0x05: "Router Alert",
0x06: "Quick-Start",
0xc2: "Jumbo Payload",
0xc9: "Home Address Option"}
class _OTypeField(ByteEnumField):
"""
Modified BytEnumField that displays information regarding the IPv6 option
based on its option type value (What should be done by nodes that process
the option if they do not understand it ...)
It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
"""
pol = {0x00: "00: skip",
0x40: "01: discard",
0x80: "10: discard+ICMP",
0xC0: "11: discard+ICMP not mcast"}
enroutechange = {0x00: "0: Don't change en-route",
0x20: "1: May change en-route"}
def i2repr(self, pkt, x):
s = self.i2s.get(x, repr(x))
polstr = self.pol[(x & 0xC0)]
enroutechangestr = self.enroutechange[(x & 0x20)]
return "%s [%s, %s]" % (s, polstr, enroutechangestr)
class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
name = "Scapy6 Unknown Option"
fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
FieldLenField("optlen", None, length_of="optdata", fmt="B"),
StrLenField("optdata", "",
length_from=lambda pkt: pkt.optlen)]
def alignment_delta(self, curpos): # By default, no alignment requirement
"""
As specified in section 4.2 of RFC 2460, every options has
an alignment requirement usually expressed xn+y, meaning
the Option Type must appear at an integer multiple of x octest
from the start of the header, plus y octet.
That function is provided the current position from the
start of the header and returns required padding length.
"""
return 0
@classmethod
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt:
o = orb(_pkt[0]) # Option type
if o in _hbhoptcls:
return _hbhoptcls[o]
return cls
def extract_padding(self, p):
return b"", p
class Pad1(Packet): # IPv6 Hop-By-Hop Option
name = "Pad1"
fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
def alignment_delta(self, curpos): # No alignment requirement
return 0
def extract_padding(self, p):
return b"", p
class PadN(Packet): # IPv6 Hop-By-Hop Option
name = "PadN"
fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
FieldLenField("optlen", None, length_of="optdata", fmt="B"),
StrLenField("optdata", "",
length_from=lambda pkt: pkt.optlen)]
def alignment_delta(self, curpos): # No alignment requirement
return 0
def extract_padding(self, p):
return b"", p
class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
name = "Router Alert"
fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
ByteField("optlen", 2),
ShortEnumField("value", None,
{0: "Datagram contains a MLD message",
1: "Datagram contains RSVP message",
2: "Datagram contains an Active Network message", # noqa: E501
68: "NSIS NATFW NSLP",
69: "MPLS OAM",
65535: "Reserved"})]
# TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501
# TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501
# TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501
# iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
def alignment_delta(self, curpos): # alignment requirement : 2n+0
x = 2
y = 0
delta = x * ((curpos - y + x - 1) // x) + y - curpos
return delta
def extract_padding(self, p):
return b"", p
class Jumbo(Packet): # IPv6 Hop-By-Hop Option
name = "Jumbo Payload"
fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
ByteField("optlen", 4),
IntField("jumboplen", None)]
def alignment_delta(self, curpos): # alignment requirement : 4n+2
x = 4
y = 2
delta = x * ((curpos - y + x - 1) // x) + y - curpos
return delta
def extract_padding(self, p):
return b"", p
class HAO(Packet): # IPv6 Destination Options Header Option
name = "Home Address Option"
fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
ByteField("optlen", 16),
IP6Field("hoa", "::")]
def alignment_delta(self, curpos): # alignment requirement : 8n+6
x = 8
y = 6
delta = x * ((curpos - y + x - 1) // x) + y - curpos
return delta
def extract_padding(self, p):
return b"", p
_hbhoptcls = {0x00: Pad1,
0x01: PadN,
0x05: RouterAlert,
0xC2: Jumbo,
0xC9: HAO}
# Hop-by-Hop Extension Header #
class _OptionsField(PacketListField):
__slots__ = ["curpos"]
def __init__(self, name, default, cls, curpos, *args, **kargs):
self.curpos = curpos
PacketListField.__init__(self, name, default, cls, *args, **kargs)
def i2len(self, pkt, i):
return len(self.i2m(pkt, i))
def i2m(self, pkt, x):
autopad = None
try:
autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
except Exception:
autopad = 1
if not autopad:
return b"".join(map(str, x))
curpos = self.curpos
s = b""
for p in x:
d = p.alignment_delta(curpos)
curpos += d
if d == 1:
s += raw(Pad1())
elif d != 0:
s += raw(PadN(optdata=b'\x00' * (d - 2)))
pstr = raw(p)
curpos += len(pstr)
s += pstr
# Let's make the class including our option field
# a multiple of 8 octets long
d = curpos % 8
if d == 0:
return s
d = 8 - d
if d == 1:
s += raw(Pad1())
elif d != 0:
s += raw(PadN(optdata=b'\x00' * (d - 2)))
return s
def addfield(self, pkt, s, val):
return s + self.i2m(pkt, val)
class _PhantomAutoPadField(ByteField):
def addfield(self, pkt, s, val):
return s
def getfield(self, pkt, s):
return s, 1
def i2repr(self, pkt, x):
if x:
return "On"
return "Off"
class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
name = "IPv6 Extension Header - Hop-by-Hop Options Header"
fields_desc = [ByteEnumField("nh", 59, ipv6nh),
FieldLenField("len", None, length_of="options", fmt="B",
adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
_PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
_OptionsField("options", [], HBHOptUnknown, 2,
length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
overload_fields = {IPv6: {"nh": 0}}
# Destination Option Header #
class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
name = "IPv6 Extension Header - Destination Options Header"
fields_desc = [ByteEnumField("nh", 59, ipv6nh),
FieldLenField("len", None, length_of="options", fmt="B",
adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
_PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
_OptionsField("options", [], HBHOptUnknown, 2,
length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
overload_fields = {IPv6: {"nh": 60}}
# Routing Header #
class IPv6ExtHdrRouting(_IPv6ExtHdr):
name = "IPv6 Option Header Routing"
fields_desc = [ByteEnumField("nh", 59, ipv6nh),
FieldLenField("len", None, count_of="addresses", fmt="B",
adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501
ByteField("type", 0),
ByteField("segleft", None),
BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501
IP6ListField("addresses", [],
length_from=lambda pkt: 8 * pkt.len)]
overload_fields = {IPv6: {"nh": 43}}
def post_build(self, pkt, pay):
if self.segleft is None:
pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
return _IPv6ExtHdr.post_build(self, pkt, pay)
# Segment Routing Header #
# This implementation is based on draft 06, available at:
# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
class IPv6ExtHdrSegmentRoutingTLV(Packet):
name = "IPv6 Option Header Segment Routing - Generic TLV"
fields_desc = [ByteField("type", 0),
ByteField("len", 0),
ByteField("reserved", 0),
ByteField("flags", 0),
StrLenField("value", "", length_from=lambda pkt: pkt.len)]
def extract_padding(self, p):
return b"", p
registered_sr_tlv = {}
@classmethod
def register_variant(cls):
cls.registered_sr_tlv[cls.type.default] = cls
@classmethod
def dispatch_hook(cls, pkt=None, *args, **kargs):
if pkt:
tmp_type = orb(pkt[0])
return cls.registered_sr_tlv.get(tmp_type, cls)
return cls
class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
fields_desc = [ByteField("type", 1),
ByteField("len", 18),
ByteField("reserved", 0),
ByteField("flags", 0),
IP6Field("ingress_node", "::1")]
class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
name = "IPv6 Option Header Segment Routing - Egress Node TLV"
fields_desc = [ByteField("type", 2),
ByteField("len", 18),
ByteField("reserved", 0),
ByteField("flags", 0),
IP6Field("egress_node", "::1")]
class IPv6ExtHdrSegmentRoutingTLVPadding(IPv6ExtHdrSegmentRoutingTLV):
name = "IPv6 Option Header Segment Routing - Padding TLV"
fields_desc = [ByteField("type", 4),
FieldLenField("len", None, length_of="padding", fmt="B"),
StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
name = "IPv6 Option Header Segment Routing"
fields_desc = [ByteEnumField("nh", 59, ipv6nh),
ByteField("len", None),
ByteField("type", 4),
ByteField("segleft", None),
ByteField("lastentry", None),
BitField("unused1", 0, 1),
BitField("protected", 0, 1),
BitField("oam", 0, 1),
BitField("alert", 0, 1),
BitField("hmac", 0, 1),
BitField("unused2", 0, 3),
ShortField("tag", 0),
IP6ListField("addresses", ["::1"],
count_from=lambda pkt: pkt.lastentry),
PacketListField("tlv_objects", [], IPv6ExtHdrSegmentRoutingTLV, # noqa: E501
length_from=lambda pkt: 8 * pkt.len - 16 * pkt.lastentry)] # noqa: E501
overload_fields = {IPv6: {"nh": 43}}
def post_build(self, pkt, pay):
if self.len is None:
# The extension must be align on 8 bytes
tmp_mod = (len(pkt) - 8) % 8
if tmp_mod == 1:
warning("IPv6ExtHdrSegmentRouting(): can't pad 1 byte !")
elif tmp_mod >= 2:
# Add the padding extension
tmp_pad = b"\x00" * (tmp_mod - 2)
tlv = IPv6ExtHdrSegmentRoutingTLVPadding(padding=tmp_pad)
pkt += raw(tlv)
tmp_len = (len(pkt) - 8) // 8
pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
if self.segleft is None:
tmp_len = len(self.addresses)
if tmp_len:
tmp_len -= 1
pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
if self.lastentry is None:
pkt = pkt[:4] + struct.pack("B", len(self.addresses)) + pkt[5:]